• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Nils Weiss <nils@we155.de>
5
6# scapy.contrib.description = Unified Diagnostic Service (UDS)
7# scapy.contrib.status = loads
8
9"""
10UDS
11"""
12
13import struct
14from collections import defaultdict
15
16from scapy.fields import ByteEnumField, StrField, ConditionalField, \
17    BitEnumField, BitField, XByteField, FieldListField, \
18    XShortField, X3BytesField, XIntField, ByteField, \
19    ShortField, ObservableDict, XShortEnumField, XByteEnumField, StrLenField, \
20    FieldLenField, XStrFixedLenField, XStrLenField, FlagsField, PacketListField, \
21    PacketField
22from scapy.packet import Packet, bind_layers, NoPayload, Raw
23from scapy.config import conf
24from scapy.error import log_loading
25from scapy.utils import PeriodicSenderThread
26from scapy.contrib.isotp import ISOTP
27
28# Typing imports
29from typing import (
30    Dict,
31    Union,
32)
33
34try:
35    if conf.contribs['UDS']['treat-response-pending-as-answer']:
36        pass
37except KeyError:
38    log_loading.info("Specify \"conf.contribs['UDS'] = "
39                     "{'treat-response-pending-as-answer': True}\" to treat "
40                     "a negative response 'requestCorrectlyReceived-"
41                     "ResponsePending' as answer of a request. \n"
42                     "The default value is False.")
43    conf.contribs['UDS'] = {'treat-response-pending-as-answer': False}
44
45
46conf.debug_dissector = True
47
48
49class UDS(ISOTP):
50    services = ObservableDict(
51        {0x10: 'DiagnosticSessionControl',
52         0x11: 'ECUReset',
53         0x14: 'ClearDiagnosticInformation',
54         0x19: 'ReadDTCInformation',
55         0x22: 'ReadDataByIdentifier',
56         0x23: 'ReadMemoryByAddress',
57         0x24: 'ReadScalingDataByIdentifier',
58         0x27: 'SecurityAccess',
59         0x28: 'CommunicationControl',
60         0x29: 'Authentication',
61         0x2A: 'ReadDataPeriodicIdentifier',
62         0x2C: 'DynamicallyDefineDataIdentifier',
63         0x2E: 'WriteDataByIdentifier',
64         0x2F: 'InputOutputControlByIdentifier',
65         0x31: 'RoutineControl',
66         0x34: 'RequestDownload',
67         0x35: 'RequestUpload',
68         0x36: 'TransferData',
69         0x37: 'RequestTransferExit',
70         0x38: 'RequestFileTransfer',
71         0x3D: 'WriteMemoryByAddress',
72         0x3E: 'TesterPresent',
73         0x50: 'DiagnosticSessionControlPositiveResponse',
74         0x51: 'ECUResetPositiveResponse',
75         0x54: 'ClearDiagnosticInformationPositiveResponse',
76         0x59: 'ReadDTCInformationPositiveResponse',
77         0x62: 'ReadDataByIdentifierPositiveResponse',
78         0x63: 'ReadMemoryByAddressPositiveResponse',
79         0x64: 'ReadScalingDataByIdentifierPositiveResponse',
80         0x67: 'SecurityAccessPositiveResponse',
81         0x68: 'CommunicationControlPositiveResponse',
82         0x69: 'AuthenticationPositiveResponse',
83         0x6A: 'ReadDataPeriodicIdentifierPositiveResponse',
84         0x6C: 'DynamicallyDefineDataIdentifierPositiveResponse',
85         0x6E: 'WriteDataByIdentifierPositiveResponse',
86         0x6F: 'InputOutputControlByIdentifierPositiveResponse',
87         0x71: 'RoutineControlPositiveResponse',
88         0x74: 'RequestDownloadPositiveResponse',
89         0x75: 'RequestUploadPositiveResponse',
90         0x76: 'TransferDataPositiveResponse',
91         0x77: 'RequestTransferExitPositiveResponse',
92         0x78: 'RequestFileTransferPositiveResponse',
93         0x7D: 'WriteMemoryByAddressPositiveResponse',
94         0x7E: 'TesterPresentPositiveResponse',
95         0x83: 'AccessTimingParameter',
96         0x84: 'SecuredDataTransmission',
97         0x85: 'ControlDTCSetting',
98         0x86: 'ResponseOnEvent',
99         0x87: 'LinkControl',
100         0xC3: 'AccessTimingParameterPositiveResponse',
101         0xC4: 'SecuredDataTransmissionPositiveResponse',
102         0xC5: 'ControlDTCSettingPositiveResponse',
103         0xC6: 'ResponseOnEventPositiveResponse',
104         0xC7: 'LinkControlPositiveResponse',
105         0x7f: 'NegativeResponse'})  # type: Dict[int, str]
106    name = 'UDS'
107    fields_desc = [
108        XByteEnumField('service', 0, services)
109    ]
110
111    def answers(self, other):
112        # type: (Union[UDS, Packet]) -> bool
113        if other.__class__ != self.__class__:
114            return False
115        if self.service == 0x7f:
116            return self.payload.answers(other)
117        if self.service == (other.service + 0x40):
118            if isinstance(self.payload, NoPayload) or \
119                    isinstance(other.payload, NoPayload):
120                return len(self) <= len(other)
121            else:
122                return self.payload.answers(other.payload)
123        return False
124
125    def hashret(self):
126        # type: () -> bytes
127        if self.service == 0x7f and len(self) >= 3:
128            return struct.pack('B', bytes(self)[1] & ~0x40)
129        return struct.pack('B', self.service & ~0x40)
130
131
132# ########################DSC###################################
133class UDS_DSC(Packet):
134    diagnosticSessionTypes = ObservableDict({
135        0x00: 'ISOSAEReserved',
136        0x01: 'defaultSession',
137        0x02: 'programmingSession',
138        0x03: 'extendedDiagnosticSession',
139        0x04: 'safetySystemDiagnosticSession',
140        0x7F: 'ISOSAEReserved'})
141    name = 'DiagnosticSessionControl'
142    fields_desc = [
143        ByteEnumField('diagnosticSessionType', 0, diagnosticSessionTypes)
144    ]
145
146
147bind_layers(UDS, UDS_DSC, service=0x10)
148
149
150class UDS_DSCPR(Packet):
151    name = 'DiagnosticSessionControlPositiveResponse'
152    fields_desc = [
153        ByteEnumField('diagnosticSessionType', 0,
154                      UDS_DSC.diagnosticSessionTypes),
155        StrField('sessionParameterRecord', b"")
156    ]
157
158    def answers(self, other):
159        return isinstance(other, UDS_DSC) and \
160            other.diagnosticSessionType == self.diagnosticSessionType
161
162
163bind_layers(UDS, UDS_DSCPR, service=0x50)
164
165
166# #########################ER###################################
167class UDS_ER(Packet):
168    resetTypes = {
169        0x00: 'ISOSAEReserved',
170        0x01: 'hardReset',
171        0x02: 'keyOffOnReset',
172        0x03: 'softReset',
173        0x04: 'enableRapidPowerShutDown',
174        0x05: 'disableRapidPowerShutDown',
175        0x41: 'powerDown',
176        0x7F: 'ISOSAEReserved'}
177    name = 'ECUReset'
178    fields_desc = [
179        ByteEnumField('resetType', 0, resetTypes)
180    ]
181
182
183bind_layers(UDS, UDS_ER, service=0x11)
184
185
186class UDS_ERPR(Packet):
187    name = 'ECUResetPositiveResponse'
188    fields_desc = [
189        ByteEnumField('resetType', 0, UDS_ER.resetTypes),
190        ConditionalField(ByteField('powerDownTime', 0),
191                         lambda pkt: pkt.resetType == 0x04)
192    ]
193
194    def answers(self, other):
195        return isinstance(other, UDS_ER) and other.resetType == self.resetType
196
197
198bind_layers(UDS, UDS_ERPR, service=0x51)
199
200
201# #########################SA###################################
202class UDS_SA(Packet):
203    name = 'SecurityAccess'
204    fields_desc = [
205        ByteField('securityAccessType', 0),
206        ConditionalField(StrField('securityAccessDataRecord', b""),
207                         lambda pkt: pkt.securityAccessType % 2 == 1),
208        ConditionalField(StrField('securityKey', b""),
209                         lambda pkt: pkt.securityAccessType % 2 == 0)
210    ]
211
212
213bind_layers(UDS, UDS_SA, service=0x27)
214
215
216class UDS_SAPR(Packet):
217    name = 'SecurityAccessPositiveResponse'
218    fields_desc = [
219        ByteField('securityAccessType', 0),
220        ConditionalField(StrField('securitySeed', b""),
221                         lambda pkt: pkt.securityAccessType % 2 == 1),
222    ]
223
224    def answers(self, other):
225        return isinstance(other, UDS_SA) \
226            and other.securityAccessType == self.securityAccessType
227
228
229bind_layers(UDS, UDS_SAPR, service=0x67)
230
231
232# #########################CC###################################
233class UDS_CC(Packet):
234    controlTypes = {
235        0x00: 'enableRxAndTx',
236        0x01: 'enableRxAndDisableTx',
237        0x02: 'disableRxAndEnableTx',
238        0x03: 'disableRxAndTx'
239    }
240    name = 'CommunicationControl'
241    fields_desc = [
242        ByteEnumField('controlType', 0, controlTypes),
243        BitEnumField('communicationType0', 0, 2,
244                     {0: 'ISOSAEReserved',
245                      1: 'normalCommunicationMessages',
246                      2: 'networkManagmentCommunicationMessages',
247                      3: 'networkManagmentCommunicationMessages and '
248                         'normalCommunicationMessages'}),
249        BitField('communicationType1', 0, 2),
250        BitEnumField('communicationType2', 0, 4,
251                     {0: 'Disable/Enable specified communication Type',
252                      1: 'Disable/Enable specific subnet',
253                      2: 'Disable/Enable specific subnet',
254                      3: 'Disable/Enable specific subnet',
255                      4: 'Disable/Enable specific subnet',
256                      5: 'Disable/Enable specific subnet',
257                      6: 'Disable/Enable specific subnet',
258                      7: 'Disable/Enable specific subnet',
259                      8: 'Disable/Enable specific subnet',
260                      9: 'Disable/Enable specific subnet',
261                      10: 'Disable/Enable specific subnet',
262                      11: 'Disable/Enable specific subnet',
263                      12: 'Disable/Enable specific subnet',
264                      13: 'Disable/Enable specific subnet',
265                      14: 'Disable/Enable specific subnet',
266                      15: 'Disable/Enable network'})
267    ]
268
269
270bind_layers(UDS, UDS_CC, service=0x28)
271
272
273class UDS_CCPR(Packet):
274    name = 'CommunicationControlPositiveResponse'
275    fields_desc = [
276        ByteEnumField('controlType', 0, UDS_CC.controlTypes)
277    ]
278
279    def answers(self, other):
280        return isinstance(other, UDS_CC) \
281            and other.controlType == self.controlType
282
283
284bind_layers(UDS, UDS_CCPR, service=0x68)
285
286
287# #########################AUTH###################################
288class UDS_AUTH(Packet):
289    subFunctions = {
290        0x00: 'deAuthenticate',
291        0x01: 'verifyCertificateUnidirectional',
292        0x02: 'verifyCertificateBidirectional',
293        0x03: 'proofOfOwnership',
294        0x04: 'transmitCertificate',
295        0x05: 'requestChallengeForAuthentication',
296        0x06: 'verifyProofOfOwnershipUnidirectional',
297        0x07: 'verifyProofOfOwnershipBidirectional',
298        0x08: 'authenticationConfiguration',
299        0x7F: 'ISOSAEReserved'
300    }
301    name = "Authentication"
302    fields_desc = [
303        ByteEnumField('subFunction', 0, subFunctions),
304        ConditionalField(XByteField('communicationConfiguration', 0),
305                         lambda pkt: pkt.subFunction in [0x01, 0x02, 0x5]),
306        ConditionalField(XShortField('certificateEvaluationId', 0),
307                         lambda pkt: pkt.subFunction == 0x04),
308        ConditionalField(XStrFixedLenField('algorithmIndicator', 0, length=16),
309                         lambda pkt: pkt.subFunction in [0x05, 0x06, 0x07]),
310        ConditionalField(FieldLenField('lengthOfCertificateClient', None,
311                                       fmt="H", length_of='certificateClient'),
312                         lambda pkt: pkt.subFunction in [0x01, 0x02]),
313        ConditionalField(XStrLenField('certificateClient', b"",
314                                      length_from=lambda p:
315                                      p.lengthOfCertificateClient),
316                         lambda pkt: pkt.subFunction in [0x01, 0x02]),
317        ConditionalField(FieldLenField('lengthOfProofOfOwnershipClient', None,
318                                       fmt="H",
319                                       length_of='proofOfOwnershipClient'),
320                         lambda pkt: pkt.subFunction in [0x03, 0x06, 0x07]),
321        ConditionalField(XStrLenField('proofOfOwnershipClient', b"",
322                                      length_from=lambda p:
323                                      p.lengthOfProofOfOwnershipClient),
324                         lambda pkt: pkt.subFunction in [0x03, 0x06, 0x07]),
325        ConditionalField(FieldLenField('lengthOfChallengeClient', None,
326                                       fmt="H", length_of='challengeClient'),
327                         lambda pkt: pkt.subFunction in [0x01, 0x02, 0x06,
328                                                         0x07]),
329        ConditionalField(XStrLenField('challengeClient', b"",
330                                      length_from=lambda p:
331                                      p.lengthOfChallengeClient),
332                         lambda pkt: pkt.subFunction in [0x01, 0x02, 0x06,
333                                                         0x07]),
334        ConditionalField(FieldLenField('lengthOfEphemeralPublicKeyClient',
335                                       None, fmt="H",
336                                       length_of='ephemeralPublicKeyClient'),
337                         lambda pkt: pkt.subFunction == 0x03),
338        ConditionalField(XStrLenField('ephemeralPublicKeyClient', b"",
339                                      length_from=lambda p:
340                                      p.lengthOfEphemeralPublicKeyClient),
341                         lambda pkt: pkt.subFunction == 0x03),
342        ConditionalField(FieldLenField('lengthOfCertificateData', None,
343                                       fmt="H", length_of='certificateData'),
344                         lambda pkt: pkt.subFunction == 0x04),
345        ConditionalField(XStrLenField('certificateData', b"",
346                                      length_from=lambda p:
347                                      p.lengthOfCertificateData),
348                         lambda pkt: pkt.subFunction == 0x04),
349        ConditionalField(FieldLenField('lengthOfAdditionalParameter', None,
350                                       fmt="H",
351                                       length_of='additionalParameter'),
352                         lambda pkt: pkt.subFunction in [0x06, 0x07]),
353        ConditionalField(XStrLenField('additionalParameter', b"",
354                                      length_from=lambda p:
355                                      p.lengthOfAdditionalParameter),
356                         lambda pkt: pkt.subFunction in [0x06, 0x07]),
357    ]
358
359
360bind_layers(UDS, UDS_AUTH, service=0x29)
361
362
363class UDS_AUTHPR(Packet):
364    authenticationReturnParameterTypes = {
365        0x00: 'requestAccepted',
366        0x01: 'generalReject',
367        # Authentication with PKI Certificate Exchange (ACPE)
368        0x02: 'authenticationConfigurationAPCE',
369        # Authentication with Challenge-Response (ACR)
370        0x03: 'authenticationConfigurationACRWithAsymmetricCryptography',
371        0x04: 'authenticationConfigurationACRWithSymmetricCryptography',
372        0x05: 'ISOSAEReserved',
373        0x0F: 'ISOSAEReserved',
374        0x10: 'deAuthenticationSuccessful',
375        0x11: 'certificateVerifiedOwnershipVerificationNecessary',
376        0x12: 'ownershipVerifiedAuthenticationComplete',
377        0x13: 'certificateVerified',
378        0x14: 'ISOSAEReserved',
379        0x9F: 'ISOSAEReserved',
380        0xFF: 'ISOSAEReserved'
381    }
382    name = 'AuthenticationPositiveResponse'
383    fields_desc = [
384        ByteEnumField('subFunction', 0, UDS_AUTH.subFunctions),
385        ByteEnumField('returnValue', 0, authenticationReturnParameterTypes),
386        ConditionalField(XStrFixedLenField('algorithmIndicator', 0, length=16),
387                         lambda pkt: pkt.subFunction in [0x05, 0x06, 0x07]),
388        ConditionalField(FieldLenField('lengthOfChallengeServer', None,
389                                       fmt="H", length_of='challengeServer'),
390                         lambda pkt: pkt.subFunction in [0x01, 0x02, 0x05]),
391        ConditionalField(XStrLenField('challengeServer', b"",
392                                      length_from=lambda p:
393                                      p.lengthOfChallengeServer),
394                         lambda pkt: pkt.subFunction in [0x01, 0x02, 0x05]),
395        ConditionalField(FieldLenField('lengthOfCertificateServer', None,
396                                       fmt="H", length_of='certificateServer'),
397                         lambda pkt: pkt.subFunction == 0x02),
398        ConditionalField(XStrLenField('certificateServer', b"",
399                                      length_from=lambda p:
400                                      p.lengthOfCertificateServer),
401                         lambda pkt: pkt.subFunction == 0x02),
402        ConditionalField(FieldLenField('lengthOfProofOfOwnershipServer', None,
403                                       fmt="H",
404                                       length_of='proofOfOwnershipServer'),
405                         lambda pkt: pkt.subFunction in [0x02, 0x07]),
406        ConditionalField(XStrLenField('proofOfOwnershipServer', b"",
407                                      length_from=lambda p:
408                                      p.lengthOfProofOfOwnershipServer),
409                         lambda pkt: pkt.subFunction in [0x02, 0x07]),
410        ConditionalField(FieldLenField('lengthOfSessionKeyInfo', None, fmt="H",
411                                       length_of='sessionKeyInfo'),
412                         lambda pkt: pkt.subFunction in [0x03, 0x06, 0x07]),
413        ConditionalField(XStrLenField('sessionKeyInfo', b"",
414                                      length_from=lambda p:
415                                      p.lengthOfSessionKeyInfo),
416                         lambda pkt: pkt.subFunction in [0x03, 0x06, 0x07]),
417        ConditionalField(FieldLenField('lengthOfEphemeralPublicKeyServer',
418                                       None, fmt="H",
419                                       length_of='ephemeralPublicKeyServer'),
420                         lambda pkt: pkt.subFunction in [0x01, 0x02]),
421        ConditionalField(XStrLenField('ephemeralPublicKeyServer', b"",
422                                      length_from=lambda p:
423                                      p.lengthOfEphemeralPublicKeyServer),
424                         lambda pkt: pkt.subFunction in [0x1, 0x02]),
425        ConditionalField(FieldLenField('lengthOfNeededAdditionalParameter',
426                                       None, fmt="H",
427                                       length_of='neededAdditionalParameter'),
428                         lambda pkt: pkt.subFunction == 0x05),
429        ConditionalField(XStrLenField('neededAdditionalParameter', b"",
430                                      length_from=lambda p:
431                                      p.lengthOfNeededAdditionalParameter),
432                         lambda pkt: pkt.subFunction == 0x05),
433    ]
434
435    def answers(self, other):
436        return isinstance(other, UDS_AUTH) \
437            and other.subFunction == self.subFunction
438
439
440bind_layers(UDS, UDS_AUTHPR, service=0x69)
441
442
443# #########################TP###################################
444class UDS_TP(Packet):
445    name = 'TesterPresent'
446    fields_desc = [
447        ByteField('subFunction', 0)
448    ]
449
450
451bind_layers(UDS, UDS_TP, service=0x3E)
452
453
454class UDS_TPPR(Packet):
455    name = 'TesterPresentPositiveResponse'
456    fields_desc = [
457        ByteField('zeroSubFunction', 0)
458    ]
459
460    def answers(self, other):
461        return isinstance(other, UDS_TP)
462
463
464bind_layers(UDS, UDS_TPPR, service=0x7E)
465
466
467# #########################ATP###################################
468class UDS_ATP(Packet):
469    timingParameterAccessTypes = {
470        0: 'ISOSAEReserved',
471        1: 'readExtendedTimingParameterSet',
472        2: 'setTimingParametersToDefaultValues',
473        3: 'readCurrentlyActiveTimingParameters',
474        4: 'setTimingParametersToGivenValues'
475    }
476    name = 'AccessTimingParameter'
477    fields_desc = [
478        ByteEnumField('timingParameterAccessType', 0,
479                      timingParameterAccessTypes),
480        ConditionalField(StrField('timingParameterRequestRecord', b""),
481                         lambda pkt: pkt.timingParameterAccessType == 0x4)
482    ]
483
484
485bind_layers(UDS, UDS_ATP, service=0x83)
486
487
488class UDS_ATPPR(Packet):
489    name = 'AccessTimingParameterPositiveResponse'
490    fields_desc = [
491        ByteEnumField('timingParameterAccessType', 0,
492                      UDS_ATP.timingParameterAccessTypes),
493        ConditionalField(StrField('timingParameterResponseRecord', b""),
494                         lambda pkt: pkt.timingParameterAccessType == 0x3)
495    ]
496
497    def answers(self, other):
498        return isinstance(other, UDS_ATP) \
499            and other.timingParameterAccessType == \
500            self.timingParameterAccessType
501
502
503bind_layers(UDS, UDS_ATPPR, service=0xC3)
504
505
506# #########################SDT###################################
507# TODO: Implement correct internal message service handling here,
508# instead of using just the dataRecord
509class UDS_SDT(Packet):
510    name = 'SecuredDataTransmission'
511    fields_desc = [
512        BitField('requestMessage', 0, 1),
513        BitField('ISOSAEReservedBackwardsCompatibility', 0, 2),
514        BitField('preEstablishedKeyUsed', 0, 1),
515        BitField('encryptedMessage', 0, 1),
516        BitField('signedMessage', 0, 1),
517        BitField('signedResponseRequested', 0, 1),
518        BitField('ISOSAEReserved', 0, 9),
519        ByteField('signatureEncryptionCalculation', 0),
520        XShortField('signatureLength', 0),
521        XShortField('antiReplayCounter', 0),
522        ByteField('internalMessageServiceRequestId', 0),
523        StrField('dataRecord', b"", fmt="B")
524    ]
525
526
527bind_layers(UDS, UDS_SDT, service=0x84)
528
529
530class UDS_SDTPR(Packet):
531    name = 'SecuredDataTransmissionPositiveResponse'
532    fields_desc = [
533        BitField('requestMessage', 0, 1),
534        BitField('ISOSAEReservedBackwardsCompatibility', 0, 2),
535        BitField('preEstablishedKeyUsed', 0, 1),
536        BitField('encryptedMessage', 0, 1),
537        BitField('signedMessage', 0, 1),
538        BitField('signedResponseRequested', 0, 1),
539        BitField('ISOSAEReserved', 0, 9),
540        ByteField('signatureEncryptionCalculation', 0),
541        XShortField('signatureLength', 0),
542        XShortField('antiReplayCounter', 0),
543        ByteField('internalMessageServiceResponseId', 0),
544        StrField('dataRecord', b"", fmt="B")
545    ]
546
547    def answers(self, other):
548        return isinstance(other, UDS_SDT)
549
550
551bind_layers(UDS, UDS_SDTPR, service=0xC4)
552
553
554# #########################CDTCS###################################
555class UDS_CDTCS(Packet):
556    DTCSettingTypes = {
557        0: 'ISOSAEReserved',
558        1: 'on',
559        2: 'off'
560    }
561    name = 'ControlDTCSetting'
562    fields_desc = [
563        ByteEnumField('DTCSettingType', 0, DTCSettingTypes),
564        StrField('DTCSettingControlOptionRecord', b"")
565    ]
566
567
568bind_layers(UDS, UDS_CDTCS, service=0x85)
569
570
571class UDS_CDTCSPR(Packet):
572    name = 'ControlDTCSettingPositiveResponse'
573    fields_desc = [
574        ByteEnumField('DTCSettingType', 0, UDS_CDTCS.DTCSettingTypes)
575    ]
576
577    def answers(self, other):
578        return isinstance(other, UDS_CDTCS)
579
580
581bind_layers(UDS, UDS_CDTCSPR, service=0xC5)
582
583
584# #########################ROE###################################
585# TODO: improve this protocol implementation
586class UDS_ROE(Packet):
587    eventTypes = {
588        0: 'doNotStoreEvent',
589        1: 'storeEvent'
590    }
591    name = 'ResponseOnEvent'
592    fields_desc = [
593        ByteEnumField('eventType', 0, eventTypes),
594        ByteField('eventWindowTime', 0),
595        StrField('eventTypeRecord', b"")
596    ]
597
598
599bind_layers(UDS, UDS_ROE, service=0x86)
600
601
602class UDS_ROEPR(Packet):
603    name = 'ResponseOnEventPositiveResponse'
604    fields_desc = [
605        ByteEnumField('eventType', 0, UDS_ROE.eventTypes),
606        ByteField('numberOfIdentifiedEvents', 0),
607        ByteField('eventWindowTime', 0),
608        StrField('eventTypeRecord', b"")
609    ]
610
611    def answers(self, other):
612        return isinstance(other, UDS_ROE) \
613            and other.eventType == self.eventType
614
615
616bind_layers(UDS, UDS_ROEPR, service=0xC6)
617
618
619# #########################LC###################################
620class UDS_LC(Packet):
621    linkControlTypes = {
622        0: 'ISOSAEReserved',
623        1: 'verifyBaudrateTransitionWithFixedBaudrate',
624        2: 'verifyBaudrateTransitionWithSpecificBaudrate',
625        3: 'transitionBaudrate'
626    }
627    name = 'LinkControl'
628    fields_desc = [
629        ByteEnumField('linkControlType', 0, linkControlTypes),
630        ConditionalField(ByteField('baudrateIdentifier', 0),
631                         lambda pkt: pkt.linkControlType == 0x1),
632        ConditionalField(ByteField('baudrateHighByte', 0),
633                         lambda pkt: pkt.linkControlType == 0x2),
634        ConditionalField(ByteField('baudrateMiddleByte', 0),
635                         lambda pkt: pkt.linkControlType == 0x2),
636        ConditionalField(ByteField('baudrateLowByte', 0),
637                         lambda pkt: pkt.linkControlType == 0x2)
638    ]
639
640
641bind_layers(UDS, UDS_LC, service=0x87)
642
643
644class UDS_LCPR(Packet):
645    name = 'LinkControlPositiveResponse'
646    fields_desc = [
647        ByteEnumField('linkControlType', 0, UDS_LC.linkControlTypes)
648    ]
649
650    def answers(self, other):
651        return isinstance(other, UDS_LC) \
652            and other.linkControlType == self.linkControlType
653
654
655bind_layers(UDS, UDS_LCPR, service=0xC7)
656
657
658# #########################RDBI###################################
659class UDS_RDBI(Packet):
660    dataIdentifiers = ObservableDict()
661    name = 'ReadDataByIdentifier'
662    fields_desc = [
663        FieldListField("identifiers", None,
664                       XShortEnumField('dataIdentifier', 0,
665                                       dataIdentifiers))
666    ]
667
668
669bind_layers(UDS, UDS_RDBI, service=0x22)
670
671
672class UDS_RDBIPR(Packet):
673    name = 'ReadDataByIdentifierPositiveResponse'
674    fields_desc = [
675        XShortEnumField('dataIdentifier', 0,
676                        UDS_RDBI.dataIdentifiers),
677    ]
678
679    def answers(self, other):
680        return isinstance(other, UDS_RDBI) \
681            and self.dataIdentifier in other.identifiers
682
683
684bind_layers(UDS, UDS_RDBIPR, service=0x62)
685
686
687# #########################RMBA###################################
688class UDS_RMBA(Packet):
689    name = 'ReadMemoryByAddress'
690    fields_desc = [
691        BitField('memorySizeLen', 0, 4),
692        BitField('memoryAddressLen', 0, 4),
693        ConditionalField(XByteField('memoryAddress1', 0),
694                         lambda pkt: pkt.memoryAddressLen == 1),
695        ConditionalField(XShortField('memoryAddress2', 0),
696                         lambda pkt: pkt.memoryAddressLen == 2),
697        ConditionalField(X3BytesField('memoryAddress3', 0),
698                         lambda pkt: pkt.memoryAddressLen == 3),
699        ConditionalField(XIntField('memoryAddress4', 0),
700                         lambda pkt: pkt.memoryAddressLen == 4),
701        ConditionalField(XByteField('memorySize1', 0),
702                         lambda pkt: pkt.memorySizeLen == 1),
703        ConditionalField(XShortField('memorySize2', 0),
704                         lambda pkt: pkt.memorySizeLen == 2),
705        ConditionalField(X3BytesField('memorySize3', 0),
706                         lambda pkt: pkt.memorySizeLen == 3),
707        ConditionalField(XIntField('memorySize4', 0),
708                         lambda pkt: pkt.memorySizeLen == 4),
709    ]
710
711
712bind_layers(UDS, UDS_RMBA, service=0x23)
713
714
715class UDS_RMBAPR(Packet):
716    name = 'ReadMemoryByAddressPositiveResponse'
717    fields_desc = [
718        StrField('dataRecord', b"", fmt="B")
719    ]
720
721    def answers(self, other):
722        return isinstance(other, UDS_RMBA)
723
724
725bind_layers(UDS, UDS_RMBAPR, service=0x63)
726
727
728# #########################RSDBI###################################
729class UDS_RSDBI(Packet):
730    name = 'ReadScalingDataByIdentifier'
731    dataIdentifiers = ObservableDict()
732    fields_desc = [
733        XShortEnumField('dataIdentifier', 0, dataIdentifiers)
734    ]
735
736
737bind_layers(UDS, UDS_RSDBI, service=0x24)
738
739
740# TODO: Implement correct scaling here, instead of using just the dataRecord
741class UDS_RSDBIPR(Packet):
742    name = 'ReadScalingDataByIdentifierPositiveResponse'
743    fields_desc = [
744        XShortEnumField('dataIdentifier', 0, UDS_RSDBI.dataIdentifiers),
745        ByteField('scalingByte', 0),
746        StrField('dataRecord', b"", fmt="B")
747    ]
748
749    def answers(self, other):
750        return isinstance(other, UDS_RSDBI) \
751            and other.dataIdentifier == self.dataIdentifier
752
753
754bind_layers(UDS, UDS_RSDBIPR, service=0x64)
755
756
757# #########################RDBPI###################################
758class UDS_RDBPI(Packet):
759    periodicDataIdentifiers = ObservableDict()
760    transmissionModes = {
761        0: 'ISOSAEReserved',
762        1: 'sendAtSlowRate',
763        2: 'sendAtMediumRate',
764        3: 'sendAtFastRate',
765        4: 'stopSending'
766    }
767    name = 'ReadDataByPeriodicIdentifier'
768    fields_desc = [
769        ByteEnumField('transmissionMode', 0, transmissionModes),
770        ByteEnumField('periodicDataIdentifier', 0, periodicDataIdentifiers),
771        StrField('furtherPeriodicDataIdentifier', b"", fmt="B")
772    ]
773
774
775bind_layers(UDS, UDS_RDBPI, service=0x2A)
776
777
778# TODO: Implement correct scaling here, instead of using just the dataRecord
779class UDS_RDBPIPR(Packet):
780    name = 'ReadDataByPeriodicIdentifierPositiveResponse'
781    fields_desc = [
782        ByteField('periodicDataIdentifier', 0),
783        StrField('dataRecord', b"", fmt="B")
784    ]
785
786    def answers(self, other):
787        return isinstance(other, UDS_RDBPI) \
788            and other.periodicDataIdentifier == self.periodicDataIdentifier
789
790
791bind_layers(UDS, UDS_RDBPIPR, service=0x6A)
792
793
794# #########################DDDI###################################
795# TODO: Implement correct interpretation here,
796# instead of using just the dataRecord
797class UDS_DDDI(Packet):
798    name = 'DynamicallyDefineDataIdentifier'
799    subFunctions = {0x1: "defineByIdentifier",
800                    0x2: "defineByMemoryAddress",
801                    0x3: "clearDynamicallyDefinedDataIdentifier"}
802    fields_desc = [
803        ByteEnumField('subFunction', 0, subFunctions),
804        StrField('dataRecord', b"", fmt="B")
805    ]
806
807
808bind_layers(UDS, UDS_DDDI, service=0x2C)
809
810
811class UDS_DDDIPR(Packet):
812    name = 'DynamicallyDefineDataIdentifierPositiveResponse'
813    fields_desc = [
814        ByteEnumField('subFunction', 0, UDS_DDDI.subFunctions),
815        XShortField('dynamicallyDefinedDataIdentifier', 0)
816    ]
817
818    def answers(self, other):
819        return isinstance(other, UDS_DDDI) \
820            and other.subFunction == self.subFunction
821
822
823bind_layers(UDS, UDS_DDDIPR, service=0x6C)
824
825
826# #########################WDBI###################################
827class UDS_WDBI(Packet):
828    name = 'WriteDataByIdentifier'
829    fields_desc = [
830        XShortEnumField('dataIdentifier', 0,
831                        UDS_RDBI.dataIdentifiers)
832    ]
833
834
835bind_layers(UDS, UDS_WDBI, service=0x2E)
836
837
838class UDS_WDBIPR(Packet):
839    name = 'WriteDataByIdentifierPositiveResponse'
840    fields_desc = [
841        XShortEnumField('dataIdentifier', 0,
842                        UDS_RDBI.dataIdentifiers),
843    ]
844
845    def answers(self, other):
846        return isinstance(other, UDS_WDBI) \
847            and other.dataIdentifier == self.dataIdentifier
848
849
850bind_layers(UDS, UDS_WDBIPR, service=0x6E)
851
852
853# #########################WMBA###################################
854class UDS_WMBA(Packet):
855    name = 'WriteMemoryByAddress'
856    fields_desc = [
857        BitField('memorySizeLen', 0, 4),
858        BitField('memoryAddressLen', 0, 4),
859        ConditionalField(XByteField('memoryAddress1', 0),
860                         lambda pkt: pkt.memoryAddressLen == 1),
861        ConditionalField(XShortField('memoryAddress2', 0),
862                         lambda pkt: pkt.memoryAddressLen == 2),
863        ConditionalField(X3BytesField('memoryAddress3', 0),
864                         lambda pkt: pkt.memoryAddressLen == 3),
865        ConditionalField(XIntField('memoryAddress4', 0),
866                         lambda pkt: pkt.memoryAddressLen == 4),
867        ConditionalField(XByteField('memorySize1', 0),
868                         lambda pkt: pkt.memorySizeLen == 1),
869        ConditionalField(XShortField('memorySize2', 0),
870                         lambda pkt: pkt.memorySizeLen == 2),
871        ConditionalField(X3BytesField('memorySize3', 0),
872                         lambda pkt: pkt.memorySizeLen == 3),
873        ConditionalField(XIntField('memorySize4', 0),
874                         lambda pkt: pkt.memorySizeLen == 4),
875        StrField('dataRecord', b'', fmt="B"),
876
877    ]
878
879
880bind_layers(UDS, UDS_WMBA, service=0x3D)
881
882
883class UDS_WMBAPR(Packet):
884    name = 'WriteMemoryByAddressPositiveResponse'
885    fields_desc = [
886        BitField('memorySizeLen', 0, 4),
887        BitField('memoryAddressLen', 0, 4),
888        ConditionalField(XByteField('memoryAddress1', 0),
889                         lambda pkt: pkt.memoryAddressLen == 1),
890        ConditionalField(XShortField('memoryAddress2', 0),
891                         lambda pkt: pkt.memoryAddressLen == 2),
892        ConditionalField(X3BytesField('memoryAddress3', 0),
893                         lambda pkt: pkt.memoryAddressLen == 3),
894        ConditionalField(XIntField('memoryAddress4', 0),
895                         lambda pkt: pkt.memoryAddressLen == 4),
896        ConditionalField(XByteField('memorySize1', 0),
897                         lambda pkt: pkt.memorySizeLen == 1),
898        ConditionalField(XShortField('memorySize2', 0),
899                         lambda pkt: pkt.memorySizeLen == 2),
900        ConditionalField(X3BytesField('memorySize3', 0),
901                         lambda pkt: pkt.memorySizeLen == 3),
902        ConditionalField(XIntField('memorySize4', 0),
903                         lambda pkt: pkt.memorySizeLen == 4)
904    ]
905
906    def answers(self, other):
907        return isinstance(other, UDS_WMBA) \
908            and other.memorySizeLen == self.memorySizeLen \
909            and other.memoryAddressLen == self.memoryAddressLen
910
911
912bind_layers(UDS, UDS_WMBAPR, service=0x7D)
913
914
915# ##########################DTC#####################################
916class DTC(Packet):
917    name = 'Diagnostic Trouble Code'
918    dtc_descriptions = {}  # Customize this dictionary for each individual ECU / OEM
919
920    fields_desc = [
921        BitEnumField("system", 0, 2, {
922            0: "Powertrain",
923            1: "Chassis",
924            2: "Body",
925            3: "Network"}),
926        BitEnumField("type", 0, 2, {
927            0: "Generic",
928            1: "ManufacturerSpecific",
929            2: "Generic",
930            3: "Generic"}),
931        BitField("numeric_value_code", 0, 12),
932        ByteField("additional_information_code", 0),
933    ]
934
935    def extract_padding(self, s):
936        return '', s
937
938
939# #########################CDTCI###################################
940class UDS_CDTCI(Packet):
941    name = 'ClearDiagnosticInformation'
942    fields_desc = [
943        ByteField('groupOfDTCHighByte', 0),
944        ByteField('groupOfDTCMiddleByte', 0),
945        ByteField('groupOfDTCLowByte', 0),
946    ]
947
948
949bind_layers(UDS, UDS_CDTCI, service=0x14)
950
951
952class UDS_CDTCIPR(Packet):
953    name = 'ClearDiagnosticInformationPositiveResponse'
954
955    def answers(self, other):
956        return isinstance(other, UDS_CDTCI)
957
958
959bind_layers(UDS, UDS_CDTCIPR, service=0x54)
960
961
962# #########################RDTCI###################################
963class UDS_RDTCI(Packet):
964    reportTypes = {
965        0: 'ISOSAEReserved',
966        1: 'reportNumberOfDTCByStatusMask',
967        2: 'reportDTCByStatusMask',
968        3: 'reportDTCSnapshotIdentification',
969        4: 'reportDTCSnapshotRecordByDTCNumber',
970        5: 'reportDTCSnapshotRecordByRecordNumber',
971        6: 'reportDTCExtendedDataRecordByDTCNumber',
972        7: 'reportNumberOfDTCBySeverityMaskRecord',
973        8: 'reportDTCBySeverityMaskRecord',
974        9: 'reportSeverityInformationOfDTC',
975        10: 'reportSupportedDTC',
976        11: 'reportFirstTestFailedDTC',
977        12: 'reportFirstConfirmedDTC',
978        13: 'reportMostRecentTestFailedDTC',
979        14: 'reportMostRecentConfirmedDTC',
980        15: 'reportMirrorMemoryDTCByStatusMask',
981        16: 'reportMirrorMemoryDTCExtendedDataRecordByDTCNumber',
982        17: 'reportNumberOfMirrorMemoryDTCByStatusMask',
983        18: 'reportNumberOfEmissionsRelatedOBDDTCByStatusMask',
984        19: 'reportEmissionsRelatedOBDDTCByStatusMask',
985        20: 'reportDTCFaultDetectionCounter',
986        21: 'reportDTCWithPermanentStatus'
987    }
988    dtcStatus = {
989        1: 'TestFailed',
990        2: 'TestFailedThisOperationCycle',
991        4: 'PendingDTC',
992        8: 'ConfirmedDTC',
993        16: 'TestNotCompletedSinceLastClear',
994        32: 'TestFailedSinceLastClear',
995        64: 'TestNotCompletedThisOperationCycle',
996        128: 'WarningIndicatorRequested'
997    }
998    dtcStatusMask = {
999        1: 'ActiveDTCs',
1000        4: 'PendingDTCs',
1001        8: 'ConfirmedOrStoredDTCs',
1002        255: 'AllRecordDTCs'
1003    }
1004    dtcSeverityMask = {
1005        # 0: 'NoSeverityInformation',
1006        1: 'NoClassInformation',
1007        2: 'WWH-OBDClassA',
1008        4: 'WWH-OBDClassB1',
1009        8: 'WWH-OBDClassB2',
1010        16: 'WWH-OBDClassC',
1011        32: 'MaintenanceRequired',
1012        64: 'CheckAtNextHalt',
1013        128: 'CheckImmediately'
1014    }
1015    name = 'ReadDTCInformation'
1016    fields_desc = [
1017        ByteEnumField('reportType', 0, reportTypes),
1018        ConditionalField(FlagsField('DTCSeverityMask', 0, 8, dtcSeverityMask),
1019                         lambda pkt: pkt.reportType in [0x07, 0x08]),
1020        ConditionalField(FlagsField('DTCStatusMask', 0, 8, dtcStatusMask),
1021                         lambda pkt: pkt.reportType in [
1022                             0x01, 0x02, 0x07, 0x08, 0x0f, 0x11, 0x12, 0x13]),
1023        ConditionalField(PacketField("dtc", None, pkt_cls=DTC),
1024                         lambda pkt: pkt.reportType in [0x3, 0x4, 0x6,
1025                                                        0x10, 0x09]),
1026        ConditionalField(ByteField('DTCSnapshotRecordNumber', 0),
1027                         lambda pkt: pkt.reportType in [0x3, 0x4, 0x5]),
1028        ConditionalField(ByteField('DTCExtendedDataRecordNumber', 0),
1029                         lambda pkt: pkt.reportType in [0x6, 0x10])
1030    ]
1031
1032
1033bind_layers(UDS, UDS_RDTCI, service=0x19)
1034
1035
1036class DTCAndStatusRecord(Packet):
1037    name = 'DTC and status record'
1038    fields_desc = [
1039        PacketField("dtc", None, pkt_cls=DTC),
1040        FlagsField("status", 0, 8, UDS_RDTCI.dtcStatus)
1041    ]
1042
1043    def extract_padding(self, s):
1044        return '', s
1045
1046
1047class DTCExtendedData(Packet):
1048    name = 'Diagnostic Trouble Code Extended Data'
1049    dataTypes = ObservableDict()
1050    fields_desc = [
1051        ByteEnumField("data_type", 0, dataTypes),
1052        XByteField("record", 0)
1053    ]
1054
1055    def extract_padding(self, s):
1056        return '', s
1057
1058
1059class DTCExtendedDataRecord(Packet):
1060    fields_desc = [
1061        PacketField("dtcAndStatus", None, pkt_cls=DTCAndStatusRecord),
1062        PacketListField("extendedData", None, pkt_cls=DTCExtendedData)
1063    ]
1064
1065
1066class DTCSnapshot(Packet):
1067    identifiers = defaultdict(list)  # for later extension
1068
1069    @staticmethod
1070    def next_identifier_cb(pkt, lst, cur, remain):
1071        return Raw
1072
1073    fields_desc = [
1074        ByteField("record_number", 0),
1075        ByteField("record_number_of_identifiers", 0),
1076        PacketListField(
1077            "snapshotData", None,
1078            next_cls_cb=lambda pkt, lst, cur, remain: DTCSnapshot.next_identifier_cb(
1079                pkt, lst, cur, remain))
1080    ]
1081
1082    def extract_padding(self, s):
1083        return '', s
1084
1085
1086class DTCSnapshotRecord(Packet):
1087    fields_desc = [
1088        PacketField("dtcAndStatus", None, pkt_cls=DTCAndStatusRecord),
1089        PacketListField("snapshots", None, pkt_cls=DTCSnapshot)
1090    ]
1091
1092
1093class UDS_RDTCIPR(Packet):
1094    name = 'ReadDTCInformationPositiveResponse'
1095    fields_desc = [
1096        ByteEnumField('reportType', 0, UDS_RDTCI.reportTypes),
1097        ConditionalField(
1098            FlagsField('DTCStatusAvailabilityMask', 0, 8, UDS_RDTCI.dtcStatus),
1099            lambda pkt: pkt.reportType in [0x01, 0x07, 0x11, 0x12, 0x02, 0x0A,
1100                                           0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x13,
1101                                           0x15]),
1102        ConditionalField(ByteEnumField('DTCFormatIdentifier', 0,
1103                                       {0: 'ISO15031-6DTCFormat',
1104                                        1: 'UDS-1DTCFormat',
1105                                        2: 'SAEJ1939-73DTCFormat',
1106                                        3: 'ISO11992-4DTCFormat'}),
1107                         lambda pkt: pkt.reportType in [0x01, 0x07,
1108                                                        0x11, 0x12]),
1109        ConditionalField(ShortField('DTCCount', 0),
1110                         lambda pkt: pkt.reportType in [0x01, 0x07,
1111                                                        0x11, 0x12]),
1112        ConditionalField(PacketListField('DTCAndStatusRecord', None,
1113                                         pkt_cls=DTCAndStatusRecord),
1114                         lambda pkt: pkt.reportType in [0x02, 0x0A, 0x0B,
1115                                                        0x0C, 0x0D, 0x0E,
1116                                                        0x0F, 0x13, 0x15]),
1117        ConditionalField(StrField('dataRecord', b""),
1118                         lambda pkt: pkt.reportType in [0x03, 0x08, 0x09,
1119                                                        0x10, 0x14]),
1120        ConditionalField(PacketField('snapshotRecord', None,
1121                                     pkt_cls=DTCSnapshotRecord),
1122                         lambda pkt: pkt.reportType in [0x04]),
1123        ConditionalField(PacketField('extendedDataRecord', None,
1124                                     pkt_cls=DTCExtendedDataRecord),
1125                         lambda pkt: pkt.reportType in [0x06])
1126    ]
1127
1128    def answers(self, other):
1129        if not isinstance(other, UDS_RDTCI):
1130            return False
1131        if not other.reportType == self.reportType:
1132            return False
1133        if self.reportType == 0x06:
1134            return other.dtc == self.extendedDataRecord.dtcAndStatus.dtc
1135        if self.reportType == 0x04:
1136            return other.dtc == self.snapshotRecord.dtcAndStatus.dtc
1137        return True
1138
1139
1140bind_layers(UDS, UDS_RDTCIPR, service=0x59)
1141
1142
1143# #########################RC###################################
1144class UDS_RC(Packet):
1145    routineControlTypes = {
1146        0: 'ISOSAEReserved',
1147        1: 'startRoutine',
1148        2: 'stopRoutine',
1149        3: 'requestRoutineResults'
1150    }
1151    routineControlIdentifiers = ObservableDict()
1152    name = 'RoutineControl'
1153    fields_desc = [
1154        ByteEnumField('routineControlType', 0, routineControlTypes),
1155        XShortEnumField('routineIdentifier', 0, routineControlIdentifiers)
1156    ]
1157
1158
1159bind_layers(UDS, UDS_RC, service=0x31)
1160
1161
1162class UDS_RCPR(Packet):
1163    name = 'RoutineControlPositiveResponse'
1164    fields_desc = [
1165        ByteEnumField('routineControlType', 0, UDS_RC.routineControlTypes),
1166        XShortEnumField('routineIdentifier', 0,
1167                        UDS_RC.routineControlIdentifiers),
1168    ]
1169
1170    def answers(self, other):
1171        return isinstance(other, UDS_RC) \
1172            and other.routineControlType == self.routineControlType \
1173            and other.routineIdentifier == self.routineIdentifier
1174
1175
1176bind_layers(UDS, UDS_RCPR, service=0x71)
1177
1178
1179# #########################RD###################################
1180class UDS_RD(Packet):
1181    dataFormatIdentifiers = ObservableDict({
1182        0: 'noCompressionNoEncryption'
1183    })
1184    name = 'RequestDownload'
1185    fields_desc = [
1186        ByteEnumField('dataFormatIdentifier', 0, dataFormatIdentifiers),
1187        BitField('memorySizeLen', 0, 4),
1188        BitField('memoryAddressLen', 0, 4),
1189        ConditionalField(XByteField('memoryAddress1', 0),
1190                         lambda pkt: pkt.memoryAddressLen == 1),
1191        ConditionalField(XShortField('memoryAddress2', 0),
1192                         lambda pkt: pkt.memoryAddressLen == 2),
1193        ConditionalField(X3BytesField('memoryAddress3', 0),
1194                         lambda pkt: pkt.memoryAddressLen == 3),
1195        ConditionalField(XIntField('memoryAddress4', 0),
1196                         lambda pkt: pkt.memoryAddressLen == 4),
1197        ConditionalField(XByteField('memorySize1', 0),
1198                         lambda pkt: pkt.memorySizeLen == 1),
1199        ConditionalField(XShortField('memorySize2', 0),
1200                         lambda pkt: pkt.memorySizeLen == 2),
1201        ConditionalField(X3BytesField('memorySize3', 0),
1202                         lambda pkt: pkt.memorySizeLen == 3),
1203        ConditionalField(XIntField('memorySize4', 0),
1204                         lambda pkt: pkt.memorySizeLen == 4)
1205    ]
1206
1207
1208bind_layers(UDS, UDS_RD, service=0x34)
1209
1210
1211class UDS_RDPR(Packet):
1212    name = 'RequestDownloadPositiveResponse'
1213    fields_desc = [
1214        BitField('memorySizeLen', 0, 4),
1215        BitField('reserved', 0, 4),
1216        StrField('maxNumberOfBlockLength', b"", fmt="B"),
1217    ]
1218
1219    def answers(self, other):
1220        return isinstance(other, UDS_RD)
1221
1222
1223bind_layers(UDS, UDS_RDPR, service=0x74)
1224
1225
1226# #########################RU###################################
1227class UDS_RU(Packet):
1228    name = 'RequestUpload'
1229    fields_desc = [
1230        ByteEnumField('dataFormatIdentifier', 0,
1231                      UDS_RD.dataFormatIdentifiers),
1232        BitField('memorySizeLen', 0, 4),
1233        BitField('memoryAddressLen', 0, 4),
1234        ConditionalField(XByteField('memoryAddress1', 0),
1235                         lambda pkt: pkt.memoryAddressLen == 1),
1236        ConditionalField(XShortField('memoryAddress2', 0),
1237                         lambda pkt: pkt.memoryAddressLen == 2),
1238        ConditionalField(X3BytesField('memoryAddress3', 0),
1239                         lambda pkt: pkt.memoryAddressLen == 3),
1240        ConditionalField(XIntField('memoryAddress4', 0),
1241                         lambda pkt: pkt.memoryAddressLen == 4),
1242        ConditionalField(XByteField('memorySize1', 0),
1243                         lambda pkt: pkt.memorySizeLen == 1),
1244        ConditionalField(XShortField('memorySize2', 0),
1245                         lambda pkt: pkt.memorySizeLen == 2),
1246        ConditionalField(X3BytesField('memorySize3', 0),
1247                         lambda pkt: pkt.memorySizeLen == 3),
1248        ConditionalField(XIntField('memorySize4', 0),
1249                         lambda pkt: pkt.memorySizeLen == 4)
1250    ]
1251
1252
1253bind_layers(UDS, UDS_RU, service=0x35)
1254
1255
1256class UDS_RUPR(Packet):
1257    name = 'RequestUploadPositiveResponse'
1258    fields_desc = [
1259        BitField('memorySizeLen', 0, 4),
1260        BitField('reserved', 0, 4),
1261        StrField('maxNumberOfBlockLength', b"", fmt="B"),
1262    ]
1263
1264    def answers(self, other):
1265        return isinstance(other, UDS_RU)
1266
1267
1268bind_layers(UDS, UDS_RUPR, service=0x75)
1269
1270
1271# #########################TD###################################
1272class UDS_TD(Packet):
1273    name = 'TransferData'
1274    fields_desc = [
1275        ByteField('blockSequenceCounter', 0),
1276        StrField('transferRequestParameterRecord', b"", fmt="B")
1277    ]
1278
1279
1280bind_layers(UDS, UDS_TD, service=0x36)
1281
1282
1283class UDS_TDPR(Packet):
1284    name = 'TransferDataPositiveResponse'
1285    fields_desc = [
1286        ByteField('blockSequenceCounter', 0),
1287        StrField('transferResponseParameterRecord', b"", fmt="B")
1288    ]
1289
1290    def answers(self, other):
1291        return isinstance(other, UDS_TD) \
1292            and other.blockSequenceCounter == self.blockSequenceCounter
1293
1294
1295bind_layers(UDS, UDS_TDPR, service=0x76)
1296
1297
1298# #########################RTE###################################
1299class UDS_RTE(Packet):
1300    name = 'RequestTransferExit'
1301    fields_desc = [
1302        StrField('transferRequestParameterRecord', b"", fmt="B")
1303    ]
1304
1305
1306bind_layers(UDS, UDS_RTE, service=0x37)
1307
1308
1309class UDS_RTEPR(Packet):
1310    name = 'RequestTransferExitPositiveResponse'
1311    fields_desc = [
1312        StrField('transferResponseParameterRecord', b"", fmt="B")
1313    ]
1314
1315    def answers(self, other):
1316        return isinstance(other, UDS_RTE)
1317
1318
1319bind_layers(UDS, UDS_RTEPR, service=0x77)
1320
1321
1322# #########################RFT###################################
1323class UDS_RFT(Packet):
1324    name = 'RequestFileTransfer'
1325
1326    modeOfOperations = {
1327        0x00: "ISO/SAE Reserved",
1328        0x01: "Add File",
1329        0x02: "Delete File",
1330        0x03: "Replace File",
1331        0x04: "Read File",
1332        0x05: "Read Directory"
1333    }
1334
1335    @staticmethod
1336    def _contains_file_size(packet):
1337        return packet.modeOfOperation not in [2, 4, 5]
1338
1339    fields_desc = [
1340        XByteEnumField('modeOfOperation', 0, modeOfOperations),
1341        FieldLenField('filePathAndNameLength', None,
1342                      length_of='filePathAndName', fmt='H'),
1343        StrLenField('filePathAndName', b"",
1344                    length_from=lambda p: p.filePathAndNameLength),
1345        ConditionalField(BitField('compressionMethod', 0, 4),
1346                         lambda p: p.modeOfOperation not in [2, 5]),
1347        ConditionalField(BitField('encryptingMethod', 0, 4),
1348                         lambda p: p.modeOfOperation not in [2, 5]),
1349        ConditionalField(FieldLenField('fileSizeParameterLength', None,
1350                                       fmt="B",
1351                                       length_of='fileSizeUnCompressed'),
1352                         lambda p: UDS_RFT._contains_file_size(p)),
1353        ConditionalField(StrLenField('fileSizeUnCompressed', b"",
1354                                     length_from=lambda p:
1355                                     p.fileSizeParameterLength),
1356                         lambda p: UDS_RFT._contains_file_size(p)),
1357        ConditionalField(StrLenField('fileSizeCompressed', b"",
1358                                     length_from=lambda p:
1359                                     p.fileSizeParameterLength),
1360                         lambda p: UDS_RFT._contains_file_size(p))
1361    ]
1362
1363
1364bind_layers(UDS, UDS_RFT, service=0x38)
1365
1366
1367class UDS_RFTPR(Packet):
1368    name = 'RequestFileTransferPositiveResponse'
1369
1370    @staticmethod
1371    def _contains_data_format_identifier(packet):
1372        return packet.modeOfOperation != 0x02
1373
1374    fields_desc = [
1375        XByteEnumField('modeOfOperation', 0, UDS_RFT.modeOfOperations),
1376        ConditionalField(FieldLenField('lengthFormatIdentifier', None,
1377                                       length_of='maxNumberOfBlockLength',
1378                                       fmt='B'),
1379                         lambda p: p.modeOfOperation != 2),
1380        ConditionalField(StrLenField('maxNumberOfBlockLength', b"",
1381                                     length_from=lambda p: p.lengthFormatIdentifier),
1382                         lambda p: p.modeOfOperation != 2),
1383        ConditionalField(BitField('compressionMethod', 0, 4),
1384                         lambda p: p.modeOfOperation != 0x02),
1385        ConditionalField(BitField('encryptingMethod', 0, 4),
1386                         lambda p: p.modeOfOperation != 0x02),
1387        ConditionalField(FieldLenField('fileSizeOrDirInfoParameterLength',
1388                                       None,
1389                                       length_of='fileSizeUncompressedOrDirInfoLength'),
1390                         lambda p: p.modeOfOperation not in [1, 2, 3]),
1391        ConditionalField(StrLenField('fileSizeUncompressedOrDirInfoLength',
1392                                     b"",
1393                                     length_from=lambda p:
1394                                     p.fileSizeOrDirInfoParameterLength),
1395                         lambda p: p.modeOfOperation not in [1, 2, 3]),
1396        ConditionalField(StrLenField('fileSizeCompressed', b"",
1397                                     length_from=lambda p:
1398                                     p.fileSizeOrDirInfoParameterLength),
1399                         lambda p: p.modeOfOperation not in [1, 2, 3, 5]),
1400    ]
1401
1402    def answers(self, other):
1403        return isinstance(other, UDS_RFT)
1404
1405
1406bind_layers(UDS, UDS_RFTPR, service=0x78)
1407
1408
1409# #########################IOCBI###################################
1410class UDS_IOCBI(Packet):
1411    name = 'InputOutputControlByIdentifier'
1412    fields_desc = [
1413        XShortEnumField('dataIdentifier', 0, UDS_RDBI.dataIdentifiers),
1414    ]
1415
1416
1417bind_layers(UDS, UDS_IOCBI, service=0x2F)
1418
1419
1420class UDS_IOCBIPR(Packet):
1421    name = 'InputOutputControlByIdentifierPositiveResponse'
1422    fields_desc = [
1423        XShortEnumField('dataIdentifier', 0, UDS_RDBI.dataIdentifiers),
1424    ]
1425
1426    def answers(self, other):
1427        return isinstance(other, UDS_IOCBI) \
1428            and other.dataIdentifier == self.dataIdentifier
1429
1430
1431bind_layers(UDS, UDS_IOCBIPR, service=0x6F)
1432
1433
1434# #########################NR###################################
1435class UDS_NR(Packet):
1436    negativeResponseCodes = {
1437        0x00: 'positiveResponse',
1438        0x10: 'generalReject',
1439        0x11: 'serviceNotSupported',
1440        0x12: 'subFunctionNotSupported',
1441        0x13: 'incorrectMessageLengthOrInvalidFormat',
1442        0x14: 'responseTooLong',
1443        0x20: 'ISOSAEReserved',
1444        0x21: 'busyRepeatRequest',
1445        0x22: 'conditionsNotCorrect',
1446        0x23: 'ISOSAEReserved',
1447        0x24: 'requestSequenceError',
1448        0x25: 'noResponseFromSubnetComponent',
1449        0x26: 'failurePreventsExecutionOfRequestedAction',
1450        0x31: 'requestOutOfRange',
1451        0x33: 'securityAccessDenied',
1452        0x34: 'authenticationRequired',
1453        0x35: 'invalidKey',
1454        0x36: 'exceedNumberOfAttempts',
1455        0x37: 'requiredTimeDelayNotExpired',
1456        0x3A: 'secureDataVerificationFailed',
1457        0x50: 'certificateVerificationFailedInvalidTimePeriod',
1458        0x51: 'certificateVerificationFailedInvalidSignature',
1459        0x52: 'certificateVerificationFailedInvalidChainOfTrust',
1460        0x53: 'certificateVerificationFailedInvalidType',
1461        0x54: 'certificateVerificationFailedInvalidFormat',
1462        0x55: 'certificateVerificationFailedInvalidContent',
1463        0x56: 'certificateVerificationFailedInvalidScope',
1464        0x57: 'certificateVerificationFailedInvalidCertificateRevoked',
1465        0x58: 'ownershipVerificationFailed',
1466        0x59: 'challengeCalculationFailed',
1467        0x5a: 'settingAccessRightsFailed',
1468        0x5b: 'sessionKeyCreationOrDerivationFailed',
1469        0x5c: 'configurationDataUsageFailed',
1470        0x5d: 'deAuthenticationFailed',
1471        0x70: 'uploadDownloadNotAccepted',
1472        0x71: 'transferDataSuspended',
1473        0x72: 'generalProgrammingFailure',
1474        0x73: 'wrongBlockSequenceCounter',
1475        0x78: 'requestCorrectlyReceived-ResponsePending',
1476        0x7E: 'subFunctionNotSupportedInActiveSession',
1477        0x7F: 'serviceNotSupportedInActiveSession',
1478        0x80: 'ISOSAEReserved',
1479        0x81: 'rpmTooHigh',
1480        0x82: 'rpmTooLow',
1481        0x83: 'engineIsRunning',
1482        0x84: 'engineIsNotRunning',
1483        0x85: 'engineRunTimeTooLow',
1484        0x86: 'temperatureTooHigh',
1485        0x87: 'temperatureTooLow',
1486        0x88: 'vehicleSpeedTooHigh',
1487        0x89: 'vehicleSpeedTooLow',
1488        0x8a: 'throttle/PedalTooHigh',
1489        0x8b: 'throttle/PedalTooLow',
1490        0x8c: 'transmissionRangeNotInNeutral',
1491        0x8d: 'transmissionRangeNotInGear',
1492        0x8e: 'ISOSAEReserved',
1493        0x8f: 'brakeSwitch(es)NotClosed',
1494        0x90: 'shifterLeverNotInPark',
1495        0x91: 'torqueConverterClutchLocked',
1496        0x92: 'voltageTooHigh',
1497        0x93: 'voltageTooLow',
1498    }
1499    name = 'NegativeResponse'
1500    fields_desc = [
1501        XByteEnumField('requestServiceId', 0, UDS.services),
1502        ByteEnumField('negativeResponseCode', 0, negativeResponseCodes)
1503    ]
1504
1505    def answers(self, other):
1506        return self.requestServiceId == other.service and \
1507            (self.negativeResponseCode != 0x78 or
1508             conf.contribs['UDS']['treat-response-pending-as-answer'])
1509
1510
1511bind_layers(UDS, UDS_NR, service=0x7f)
1512
1513
1514# ##################################################################
1515# ######################## UTILS ###################################
1516# ##################################################################
1517
1518
1519class UDS_TesterPresentSender(PeriodicSenderThread):
1520    def __init__(self, sock, pkt=UDS() / UDS_TP(subFunction=0x80), interval=2):
1521        """ Thread to send TesterPresent messages packets periodically
1522
1523        Args:
1524            sock: socket where packet is sent periodically
1525            pkt: packet to send
1526            interval: interval between two packets
1527        """
1528        PeriodicSenderThread.__init__(self, sock, pkt, interval)
1529