• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1% Regression tests for EcuAnsweringMachine
2
3+ Configuration
4~ conf
5
6= Imports
7
8from test.testsocket import TestSocket, cleanup_testsockets
9
10############
11############
12+ Load general modules
13
14= Load contribution layer
15
16load_contrib("automotive.uds", globals_dict=globals())
17load_contrib("automotive.ecu", globals_dict=globals())
18load_contrib("automotive.uds_ecu_states", globals_dict=globals())
19
20ecu = TestSocket(UDS)
21tester = TestSocket(UDS)
22ecu.pair(tester)
23
24+ Simulator tests
25
26= Simple check with RDBI and Negative Response
27
28example_responses = \
29    [EcuResponse([EcuState(session=1)], responses=UDS() / UDS_RDBIPR(dataIdentifier=0x1234) / Raw(b"deadbeef"))]
30
31success = False
32answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS, verbose=False)
33sim = threading.Thread(target=answering_machine, kwargs={'timeout': 60, 'stop_filter': lambda p: p.service==0xff})
34sim.start()
35try:
36    resp = tester.sr1(UDS()/UDS_RDBI(identifiers=[0x123]), timeout=1, verbose=False)
37    assert resp.negativeResponseCode == 0x10
38    assert resp.requestServiceId == 34
39    resp = tester.sr1(UDS(service=0x22), timeout=1, verbose=False)
40    assert resp.negativeResponseCode == 0x10
41    assert resp.requestServiceId == 34
42    resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[0x1234]), timeout=1, verbose=False)
43    assert resp.service == 0x62
44    assert resp.dataIdentifier == 0x1234
45    assert resp.load == b"deadbeef"
46    success = True
47except Exception as ex:
48    print(ex)
49finally:
50    tester.send(UDS(service=0xff))
51    sim.join(timeout=10)
52
53assert success
54
55= Simple check with different Sessions
56
57example_responses = \
58    [EcuResponse(EcuState(session=2),         responses=UDS() / UDS_RDBIPR(dataIdentifier=2) / Raw(b"deadbeef1")),
59     EcuResponse(EcuState(session=[3, 4]),    responses=UDS() / UDS_RDBIPR(dataIdentifier=3) / Raw(b"deadbeef2")),
60     EcuResponse(EcuState(session=[5, 6, 7]), responses=UDS() / UDS_RDBIPR(dataIdentifier=5) / Raw(b"deadbeef3")),
61     EcuResponse(EcuState(session=[8, 9]),    responses=UDS() / UDS_RDBIPR(dataIdentifier=9) / Raw(b"deadbeef4"))]
62
63success = False
64
65answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS)
66sim = threading.Thread(target=answering_machine, kwargs={'timeout': 60, 'stop_filter': lambda p: p.service==0xff})
67sim.start()
68try:
69    resp = tester.sr1(UDS()/UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
70    assert resp.negativeResponseCode == 0x10
71    assert resp.requestServiceId == 34
72    answering_machine.state.session = 2
73    resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
74    assert resp.service == 0x62
75    assert resp.dataIdentifier == 2
76    assert resp.load == b"deadbeef1"
77    answering_machine.state.session = 4
78    resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
79    assert resp.service == 0x62
80    assert resp.dataIdentifier == 3
81    assert resp.load == b"deadbeef2"
82    answering_machine.state.session = 6
83    resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
84    assert resp.service == 0x62
85    assert resp.dataIdentifier == 5
86    assert resp.load == b"deadbeef3"
87    answering_machine.state.session = 9
88    resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
89    assert resp.service == 0x62
90    assert resp.dataIdentifier == 9
91    assert resp.load == b"deadbeef4"
92    success = True
93except Exception as ex:
94    print(ex)
95finally:
96    tester.send(UDS(service=0xff))
97    sim.join(timeout=10)
98
99assert success
100
101= Simple check with different Sessions and diagnosticSessionControl
102
103example_responses = \
104    [EcuResponse(EcuState(session=2),            responses=UDS() / UDS_RDBIPR(dataIdentifier=2) / Raw(b"deadbeef1")),
105     EcuResponse(EcuState(session=range(3,5)),   responses=UDS() / UDS_RDBIPR(dataIdentifier=3) / Raw(b"deadbeef2")),
106     EcuResponse(EcuState(session=[5,6,7]),      responses=UDS() / UDS_RDBIPR(dataIdentifier=5) / Raw(b"deadbeef3")),
107     EcuResponse(EcuState(session=9),            responses=UDS() / UDS_RDBIPR(dataIdentifier=9) / Raw(b"deadbeef4")),
108     EcuResponse([EcuState(), EcuState(session=range(0,8))],   responses=UDS() / UDS_DSCPR(diagnosticSessionType=1, sessionParameterRecord=b"dead")),
109     EcuResponse([EcuState(), EcuState(session=range(0,8))],   responses=UDS() / UDS_DSCPR(diagnosticSessionType=2, sessionParameterRecord=b"dead")),
110     EcuResponse([EcuState(), EcuState(session=range(0,8))],   responses=UDS() / UDS_DSCPR(diagnosticSessionType=3, sessionParameterRecord=b"dead")),
111     EcuResponse([EcuState(), EcuState(session=range(0,8))],   responses=UDS() / UDS_DSCPR(diagnosticSessionType=4, sessionParameterRecord=b"dead")),
112     EcuResponse([EcuState(), EcuState(session=range(0,8))],   responses=UDS() / UDS_DSCPR(diagnosticSessionType=5, sessionParameterRecord=b"dead")),
113     EcuResponse([EcuState(), EcuState(session=range(0,8))],   responses=UDS() / UDS_DSCPR(diagnosticSessionType=6, sessionParameterRecord=b"dead")),
114     EcuResponse([EcuState(), EcuState(session=range(0,8))],   responses=UDS() / UDS_DSCPR(diagnosticSessionType=7, sessionParameterRecord=b"dead")),
115     EcuResponse([EcuState(), EcuState(session=range(0,8))],   responses=UDS() / UDS_DSCPR(diagnosticSessionType=8, sessionParameterRecord=b"dead")),
116     EcuResponse([EcuState(), EcuState(session=range(8,10))],  responses=UDS() / UDS_DSCPR(diagnosticSessionType=9, sessionParameterRecord=b"dead1")),
117     EcuResponse([EcuState(), EcuState(session=range(8,10))],  responses=UDS() / UDS_DSCPR(diagnosticSessionType=9, sessionParameterRecord=b"dead2")),
118     EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))]
119
120success = False
121
122answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS)
123sim = threading.Thread(target=answering_machine, kwargs={'timeout': 60, 'stop_filter': lambda p: p.service==0xff})
124sim.start()
125try:
126    resp = tester.sr1(UDS()/UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
127    assert resp.negativeResponseCode == 0x10
128    assert resp.requestServiceId == 34
129    resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=2), timeout=1, verbose=False)
130    assert resp.service == 0x50
131    assert resp.diagnosticSessionType == 2
132    assert resp.sessionParameterRecord == b"dead"
133    resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
134    assert resp.service == 0x62
135    assert resp.dataIdentifier == 2
136    assert resp.load == b"deadbeef1"
137    resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=4), timeout=1, verbose=False)
138    assert resp.service == 0x50
139    assert resp.diagnosticSessionType == 4
140    assert resp.sessionParameterRecord == b"dead"
141    resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
142    assert resp.service == 0x62
143    assert resp.dataIdentifier == 3
144    assert resp.load == b"deadbeef2"
145    resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=6), timeout=1, verbose=False)
146    assert resp.service == 0x50
147    assert resp.diagnosticSessionType == 6
148    assert resp.sessionParameterRecord == b"dead"
149    resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
150    assert resp.service == 0x62
151    assert resp.dataIdentifier == 5
152    assert resp.load == b"deadbeef3"
153    resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=8), timeout=1, verbose=False)
154    assert resp.service == 0x50
155    assert resp.diagnosticSessionType == 8
156    assert resp.sessionParameterRecord == b"dead"
157    resp = tester.sr1(UDS() / UDS_DSC(diagnosticSessionType=9), timeout=1, verbose=False)
158    assert resp.service == 0x50
159    assert resp.diagnosticSessionType == 9
160    assert resp.sessionParameterRecord == b"dead1"
161    resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
162    assert resp.service == 0x62
163    assert resp.dataIdentifier == 9
164    assert resp.load == b"deadbeef4"
165    success = True
166except Exception as ex:
167    print(ex)
168finally:
169    tester.send(UDS(service=0xff))
170    sim.join(timeout=10)
171
172assert success
173
174= Simple check with different Sessions and diagnosticSessionControl and answers hook
175
176def custom_answers(resp, req):
177    if req.service + 0x40 != resp.service:
178        return False
179    if hasattr(req, "diagnosticSessionType"):
180        if 0 < req.diagnosticSessionType <= 8:
181            resp.diagnosticSessionType = req.diagnosticSessionType
182            return resp.answers(req)
183    return False
184
185example_responses = \
186    [EcuResponse(EcuState(session=2),            responses=UDS() / UDS_RDBIPR(dataIdentifier=2) / Raw(b"deadbeef1")),
187     EcuResponse(EcuState(session=range(3,5)),   responses=UDS() / UDS_RDBIPR(dataIdentifier=3) / Raw(b"deadbeef2")),
188     EcuResponse(EcuState(session=[5,6,7]),      responses=UDS() / UDS_RDBIPR(dataIdentifier=5) / Raw(b"deadbeef3")),
189     EcuResponse(EcuState(session=[9, 10]),      responses=UDS() / UDS_RDBIPR(dataIdentifier=9) / Raw(b"deadbeef4")),
190     EcuResponse(EcuState(session=range(0,8)),   responses=UDS() / UDS_DSCPR(diagnosticSessionType=1, sessionParameterRecord=b"dead"), answers=custom_answers),
191     EcuResponse(EcuState(session=range(8,10)),  responses=UDS() / UDS_DSCPR(diagnosticSessionType=9, sessionParameterRecord=b"dead1")),
192     EcuResponse(EcuState(session=range(8,10)),  responses=UDS() / UDS_DSCPR(diagnosticSessionType=9, sessionParameterRecord=b"dead2")),
193     EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))]
194
195success = False
196
197answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS)
198sim = threading.Thread(target=answering_machine, kwargs={'timeout': 60, 'stop_filter': lambda p: p.service==0xff})
199sim.start()
200try:
201    resp = tester.sr1(UDS()/UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
202    assert resp.negativeResponseCode == 0x10
203    assert resp.requestServiceId == 34
204    resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=2), timeout=1, verbose=False)
205    assert resp.service == 0x50
206    assert resp.diagnosticSessionType == 2
207    assert resp.sessionParameterRecord == b"dead"
208    resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
209    assert resp.service == 0x62
210    assert resp.dataIdentifier == 2
211    assert resp.load == b"deadbeef1"
212    resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=4), timeout=1, verbose=False)
213    assert resp.service == 0x50
214    assert resp.diagnosticSessionType == 4
215    assert resp.sessionParameterRecord == b"dead"
216    resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
217    assert resp.service == 0x62
218    assert resp.dataIdentifier == 3
219    assert resp.load == b"deadbeef2"
220    resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=6), timeout=1, verbose=False)
221    assert resp.service == 0x50
222    assert resp.diagnosticSessionType == 6
223    assert resp.sessionParameterRecord == b"dead"
224    resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
225    assert resp.service == 0x62
226    assert resp.dataIdentifier == 5
227    assert resp.load == b"deadbeef3"
228    resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=8), timeout=1, verbose=False)
229    assert resp.service == 0x50
230    assert resp.diagnosticSessionType == 8
231    assert resp.sessionParameterRecord == b"dead"
232    resp = tester.sr1(UDS() / UDS_DSC(diagnosticSessionType=9), timeout=1, verbose=False)
233    assert resp.service == 0x50
234    assert resp.diagnosticSessionType == 9
235    assert resp.sessionParameterRecord == b"dead1"
236    resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
237    assert resp.service == 0x62
238    assert resp.dataIdentifier == 9
239    assert resp.load == b"deadbeef4"
240    success = True
241except Exception as ex:
242    print(ex)
243finally:
244    tester.send(UDS(service=0xff))
245    sim.join(timeout=10)
246
247assert success
248
249= Simple check with security access and answers hook
250
251security_seed = b"abcd"
252
253def custom_answers(resp, req):
254    global security_seed
255    if req.service + 0x40 != resp.service or req.service != 0x27:
256        return False
257    if req.securityAccessType == 1:
258        resp.securitySeed = security_seed
259        return resp.answers(req)
260    elif req.securityAccessType == 2:
261        return resp.answers(req) and req.securityKey == security_seed + security_seed
262    return False
263
264example_responses = \
265    [EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_SAPR(securityAccessType=1, securitySeed=b"1234"), answers=custom_answers),
266     EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_SAPR(securityAccessType=2), answers=custom_answers),
267     EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x35, requestServiceId=0x27)),
268     EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))]
269
270success = False
271
272answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS)
273sim = threading.Thread(target=answering_machine, kwargs={'timeout': 10, 'stop_filter': lambda p: p.service==0xff})
274sim.start()
275try:
276    resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=1, verbose=False)
277    assert resp.service == 0x67
278    assert resp.securitySeed == b"abcd"
279    resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed), timeout=1, verbose=False)
280    assert resp.service == 0x7f
281    assert resp.negativeResponseCode == 0x35
282    resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=1, verbose=False)
283    assert resp.service == 0x67
284    assert resp.securitySeed == b"abcd"
285    resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed+resp.securitySeed), timeout=1, verbose=False)
286    assert resp.service == 0x67
287    success = True
288except Exception as ex:
289    print(ex)
290finally:
291    tester.send(UDS(service=0xff))
292    sim.join(timeout=10)
293
294assert success
295
296= Simple check with security access and answers hook and request-correctly-received message
297
298security_seed = b"abcd"
299
300def custom_answers(resp, req):
301    global security_seed
302    if req.service + 0x40 != resp.service or req.service != 0x27:
303        return False
304    if req.securityAccessType == 1:
305        resp.securitySeed = security_seed
306        return resp.answers(req)
307    elif req.securityAccessType == 2:
308        return resp.answers(req) and req.securityKey == security_seed + security_seed
309    return False
310
311example_responses = \
312    [EcuResponse(EcuState(session=range(0,255)), responses=[UDS()/UDS_NR(negativeResponseCode=0x78, requestServiceId=0x27), UDS() / UDS_SAPR(securityAccessType=1, securitySeed=b"1234")], answers=custom_answers),
313     EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_SAPR(securityAccessType=2), answers=custom_answers),
314     EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x35, requestServiceId=0x27)),
315     EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))]
316
317success = False
318
319answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS)
320sim = threading.Thread(target=answering_machine, kwargs={'timeout': 10, 'stop_filter': lambda p: p.service==0xff})
321sim.start()
322try:
323    resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=2, verbose=False)
324    assert resp.service == 0x67
325    assert resp.securitySeed == b"abcd"
326    resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed), timeout=2, verbose=False)
327    assert resp.service == 0x7f
328    assert resp.negativeResponseCode == 0x35
329    resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=2, verbose=False)
330    assert resp.service == 0x67
331    assert resp.securitySeed == b"abcd"
332    resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed+resp.securitySeed), timeout=2, verbose=False)
333    assert resp.service == 0x67
334    success = True
335except Exception as ex:
336    print(ex)
337finally:
338    tester.send(UDS(service=0xff))
339    sim.join(timeout=10)
340
341assert success
342
343= Simple check with security access and answers hook and request-correctly-received message 2
344
345security_seed = b"abcd"
346
347def custom_answers(resp, req):
348    global security_seed
349    if req.service + 0x40 != resp.service or req.service != 0x27:
350        return False
351    if req.securityAccessType == 1:
352        resp.securitySeed = security_seed
353        return resp.answers(req)
354    elif req.securityAccessType == 2:
355        return resp.answers(req) and req.securityKey == security_seed + security_seed
356    return False
357
358example_responses = \
359    [EcuResponse(EcuState(session=range(0,255)), responses=[UDS()/UDS_NR(negativeResponseCode=0x78, requestServiceId=0x27), UDS() / UDS_SAPR(securityAccessType=1, securitySeed=b"1234")], answers=custom_answers),
360     EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_SAPR(securityAccessType=2), answers=custom_answers),
361     EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x35, requestServiceId=0x27)),
362     EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))]
363
364conf.contribs['UDS']['treat-response-pending-as-answer'] = True
365
366success = False
367
368answering_machine = EcuAnsweringMachine(supported_responses=example_responses,
369                           main_socket=ecu, basecls=UDS)
370sim = threading.Thread(target=answering_machine, kwargs={'timeout':5, 'stop_filter': lambda p: p.service==0xff})
371sim.start()
372try:
373    resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=1, verbose=False)
374    assert resp.service == 0x7f
375    assert resp.negativeResponseCode == 0x78
376    resp = tester.sniff(timeout=2, count=1, verbose=False)[0]
377    assert resp.service == 0x67
378    assert resp.securitySeed == b"abcd"
379    resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed), timeout=3, verbose=False)
380    assert resp.service == 0x7f
381    assert resp.negativeResponseCode == 0x35
382    resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=1, verbose=False)
383    assert resp.service == 0x7f
384    assert resp.negativeResponseCode == 0x78
385    resp = tester.sniff(timeout=2, count=1, verbose=False)[0]
386    assert resp.service == 0x67
387    assert resp.securitySeed == b"abcd"
388    resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed+resp.securitySeed), timeout=1, verbose=False)
389    assert resp.service == 0x67
390    success = True
391except Exception as ex:
392    print(ex)
393finally:
394    tester.send(UDS(service=0xff))
395    sim.join(timeout=10)
396
397assert success
398
399conf.contribs['UDS']['treat-response-pending-as-answer'] = False
400
401+ Cleanup
402
403= Delete TestSockets
404
405cleanup_testsockets()