• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1% Regression tests for Simulated ECUs and UDS Scanners
2~ scanner
3
4+ Configuration
5~ conf
6
7= Imports
8import io
9import pickle
10from scapy.contrib.isotp import ISOTPMessageBuilder
11from test.testsocket import TestSocket, cleanup_testsockets, UnstableSocket
12from scapy.automaton import ObjectPipe
13
14############
15############
16+ Load general modules
17
18= Load contribution layer
19
20
21from scapy.contrib.automotive.uds import *
22from scapy.contrib.automotive.uds_ecu_states import *
23from scapy.contrib.automotive.uds_scan import *
24from scapy.contrib.automotive.ecu import *
25
26load_layer("can")
27
28conf.debug_dissector = False
29
30
31= Define Testfunction
32
33def executeScannerInVirtualEnvironment(supported_responses, enumerators, unstable_socket=True, **kwargs):
34    tester_obj_pipe = ObjectPipe(name="TesterPipe")
35    ecu_obj_pipe = ObjectPipe(name="ECUPipe")
36    TesterSocket = UnstableSocket if unstable_socket else TestSocket
37    tester = TesterSocket(UDS, tester_obj_pipe)
38    ecu = TestSocket(UDS, ecu_obj_pipe)
39    tester.pair(ecu)
40    answering_machine = EcuAnsweringMachine(
41        supported_responses=supported_responses, main_socket=ecu,
42        basecls=UDS, verbose=False)
43    def reset():
44        answering_machine.state.reset()
45        answering_machine.state["session"] = 1
46        sniff(timeout=0.001, opened_socket=[ecu, tester])
47    def reconnect():
48        try:
49            tester.close()
50        except Exception:
51            pass
52        tester = TesterSocket(UDS, tester_obj_pipe)
53        ecu.pair(tester)
54        return tester
55    def answering_machine_thread():
56        answering_machine(
57            timeout=120, stop_filter=lambda x: bytes(x) == b"\xff\xff\xff")
58    sim = threading.Thread(target=answering_machine_thread)
59    try:
60        sim.start()
61        scanner = UDS_Scanner(
62            tester, reset_handler=reset, reconnect_handler=reconnect,
63            test_cases=enumerators, timeout=0.1,
64            retry_if_none_received=True, unittest=True,
65            **kwargs)
66        for i in range(12):
67            print("Starting scan")
68            scanner.scan(timeout=10)
69            if scanner.scan_completed:
70                print("Scan completed after %d iterations" % i)
71                break
72    finally:
73        ecu.ins.send(Raw(b"\xff\xff\xff"))
74        sim.join(timeout=2)
75        assert not sim.is_alive()
76        cleanup_testsockets()
77        tester_obj_pipe.close()
78        ecu_obj_pipe.close()
79    if LINUX:
80        pickle_test(scanner)
81    return scanner
82
83def pickle_test(scanner):
84    f = io.BytesIO()
85    pickle.dump(scanner, f)
86    unp = pickle.loads(f.getvalue())
87    assert scanner.scan_completed == unp.scan_completed
88    assert scanner.state_paths == unp.state_paths
89
90= Load packets from pcap
91
92conf.contribs['CAN']['swap-bytes'] = True
93pkts = rdpcap(scapy_path("test/pcaps/candump_uds_scanner.pcap.gz"))
94assert len(pkts)
95
96= Create UDS messages from packets
97
98builder = ISOTPMessageBuilder(basecls=UDS, use_ext_address=False, rx_id=[0x641, 0x651])
99msgs = list()
100
101for p in pkts:
102    if p.data == b"ECURESET":
103        msgs.append(p)
104    else:
105        builder.feed(p)
106        if len(builder):
107            msgs.append(builder.pop())
108
109assert len(msgs)
110
111= Create ECU-Clone from packets
112
113mEcu = Ecu(logging=False, verbose=False, store_supported_responses=True, lookahead=3)
114
115for p in msgs:
116    if isinstance(p, CAN) and p.data == b"ECURESET":
117        mEcu.reset()
118    else:
119        mEcu.update(p)
120
121assert len(mEcu.supported_responses)
122
123= Test UDS_SAEnumerator evaluate_response
124
125e = UDS_SAEnumerator()
126
127config = {}
128
129s = EcuState(session=1)
130
131assert False == e._evaluate_response(s, UDS(b"\x27\x01"), None, **config)
132config = {"exit_if_service_not_supported": True}
133assert not e._retry_pkt[s]
134assert True == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x7f\x27\x11"), **config)
135assert not e._retry_pkt[s]
136assert True == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x7f\x27\x24"), **config)
137assert e._retry_pkt[s] == UDS(b"\x27\x01")
138assert False == e._evaluate_response(s, UDS(b"\x27\x02"), UDS(b"\x7f\x27\x24"), **config)
139assert not e._retry_pkt[s]
140assert True == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x7f\x27\x37"), **config)
141assert e._retry_pkt[s] == UDS(b"\x27\x01")
142assert False == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x7f\x27\x37"), **config)
143assert not e._retry_pkt[s]
144assert True == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x67\x01ab"), **config)
145assert not e._retry_pkt[s]
146assert False == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x67\x02ab"), **config)
147assert not e._retry_pkt[s]
148
149
150= Test UDS_SA_XOR_Enumerator stand alone mode
151
152TesterSocket = TestSocket
153ecu_sock = TestSocket(UDS)
154mTester = TesterSocket(UDS)
155ecu_sock.pair(mTester)
156answering_machine = EcuAnsweringMachine(supported_responses=mEcu.supported_responses, main_socket=ecu_sock, basecls=UDS, verbose=False)
157sim = threading.Thread(target=answering_machine, kwargs={'timeout': 1000, "stop_filter": lambda x: bytes(x) == b"\xff\xff\xff"})
158sim.start()
159try:
160    resp = mTester.sr1(UDS()/UDS_TP(b"\x00"), verbose=False, timeout=1)
161    print(repr(resp))
162    assert resp and resp.service != 0x7f
163    resp = mTester.sr1(UDS()/UDS_DSC(diagnosticSessionType=3), verbose=False, timeout=1)
164    print(repr(resp))
165    assert resp and resp.service != 0x7f
166    assert UDS_SA_XOR_Enumerator().get_security_access(mTester, 1)
167finally:
168    mTester.send(Raw(b"\xff\xff\xff"))
169    sim.join(timeout=2)
170    cleanup_testsockets()
171
172
173= Test configuration validation
174
175try:
176    scanner = UDS_Scanner(TestSocket(UDS),
177                          test_cases=[UDS_SA_XOR_Enumerator, UDS_DSCEnumerator, UDS_ServiceEnumerator],
178                          UDS_DSCEnumerator_kwargs={"scan_range": range(0x1000), "delay_state_change": 0,
179                                                    "overwrite_timeout": False})
180    assert False
181except Scapy_Exception:
182    pass
183
184= Simulate ECU and run Scanner
185
186scanner = executeScannerInVirtualEnvironment(
187    mEcu.supported_responses,
188    [UDS_SA_XOR_Enumerator, UDS_DSCEnumerator, UDS_ServiceEnumerator],
189    UDS_DSCEnumerator_kwargs={"scan_range": range(5), "delay_state_change": 0,
190                              "overwrite_timeout": False},
191    UDS_SA_XOR_Enumerator_kwargs={"scan_range": range(5)},
192    UDS_ServiceEnumerator_kwargs={"scan_range": [0x10, 0x11, 0x14, 0x19, 0x22,
193                                                 0x23, 0x24, 0x27, 0x28, 0x29,
194                                                 0x2A, 0x2C, 0x2E, 0x2F, 0x31,
195                                                 0x34, 0x35, 0x36, 0x37, 0x38,
196                                                 0x3D, 0x3E, 0x83, 0x84, 0x85,
197                                                 0x87],
198                                  "request_length": 1})
199
200scanner.show_testcases()
201scanner.show_testcases_status()
202assert len(scanner.state_paths) == 5
203assert scanner.scan_completed
204assert scanner.progress() > 0.95
205
206assert EcuState(session=1) in scanner.final_states
207assert EcuState(session=2, tp=1) in scanner.final_states
208assert EcuState(session=3, tp=1) in scanner.final_states
209assert EcuState(session=2, tp=1, security_level=2) in scanner.final_states
210assert EcuState(session=3, tp=1, security_level=2) in scanner.final_states
211
212#################### UDS_SA_XOR_Enumerator ################
213tc = scanner.configuration.test_cases[0]
214
215assert len(tc.results_without_response) < 10
216if tc.results_without_response:
217    tc.show()
218
219assert len(tc.results_with_negative_response) == 19
220assert len(tc.results_with_positive_response) >= 6
221assert len(tc.scanned_states) == 5
222
223result = tc.show(dump=True)
224
225assert "serviceNotSupportedInActiveSession received 5 times" in result
226assert "incorrectMessageLengthOrInvalidFormat received 14 times" in result
227
228################# UDS_DSCEnumerator #####################
229tc = scanner.configuration.test_cases[1]
230
231assert len(tc.results_without_response) < 10
232if tc.results_without_response:
233    tc.show()
234
235assert len(tc.results_with_negative_response) == 20
236assert len(tc.results_with_positive_response) == 5
237assert len(tc.scanned_states) == 5
238
239result = tc.show(dump=True)
240
241assert "incorrectMessageLengthOrInvalidFormat received 20 times" in result
242
243###################### UDS_ServiceEnumerator ###################
244tc = scanner.configuration.test_cases[2]
245
246assert len(tc.results_without_response) < 10
247if tc.results_without_response:
248    tc.show()
249
250assert len(tc.results_with_negative_response) == 130
251assert len(tc.results_with_positive_response) == 0
252assert len(tc.scanned_states) == 5
253
254result = tc.show(dump=True)
255
256assert "incorrectMessageLengthOrInvalidFormat received 34 times" in result
257assert "serviceNotSupported received 75 times" in result
258assert "serviceNotSupportedInActiveSession received 19 times" in result
259assert "securityAccessDenied received 2 times" in result
260
261= UDS_ServiceEnumerator
262
263def req_handler(resp, req):
264    if req.service != 0x22:
265        return False
266    if len(req) == 1:
267        resp.negativeResponseCode="generalReject"
268        return True
269    if len(req) == 2:
270        resp.negativeResponseCode="incorrectMessageLengthOrInvalidFormat"
271        return True
272    if len(req) == 3:
273        resp.negativeResponseCode="requestOutOfRange"
274        return True
275    return False
276
277resps = [EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")], req_handler)]
278
279es = [UDS_ServiceEnumerator]
280
281debug_dissector_backup = conf.debug_dissector
282
283# This Enumerator is sending corrupted Packets, therefore we need to disable the debug_dissector
284conf.debug_dissector = False
285scanner = executeScannerInVirtualEnvironment(
286    resps, es, UDS_ServiceEnumerator_kwargs={"request_length": 3}, unstable_socket=False)
287conf.debug_dissector = debug_dissector_backup
288
289assert scanner.scan_completed
290assert scanner.progress() > 0.95
291tc = scanner.configuration.test_cases[0]
292
293assert len(tc.results_without_response) < 10
294if tc.results_without_response:
295    tc.show()
296
297tc.show()
298
299assert len(tc.results_with_negative_response) == 128 * 3
300assert len(tc.results_with_positive_response) == 0
301assert len(tc.scanned_states) == 1
302
303result = tc.show(dump=True)
304
305assert "incorrectMessageLengthOrInvalidFormat" in result
306assert "requestOutOfRange" in result
307
308= UDS_RDBIEnumerator
309
310resps = [EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=1)/Raw(b"asdfbeef1")]),
311         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=2)/Raw(b"beef2")]),
312         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=3)/Raw(b"beef3")]),
313         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0xff)/Raw(b"beefff")]),
314         EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")])]
315
316es = [UDS_RDBIEnumerator]
317
318scanner = executeScannerInVirtualEnvironment(
319    resps, es, UDS_RDBIEnumerator_kwargs={"scan_range": range(0x100)})
320
321assert scanner.scan_completed
322assert scanner.progress() > 0.95
323tc = scanner.configuration.test_cases[0]
324
325assert len(tc.results_without_response) < 10
326if tc.results_without_response:
327    tc.show()
328
329assert len(tc.results_with_negative_response) == 256 - 4
330assert len(tc.results_with_positive_response) == 4
331assert len(tc.scanned_states) == 1
332
333result = tc.show(dump=True)
334
335assert "asdfbeef1" in result
336assert "beef2" in result
337assert "beef3" in result
338assert "beefff" in result
339assert "subFunctionNotSupported received" in result
340
341ids = [t.req.identifiers[0] for t in tc.results_with_positive_response]
342
343assert 1 in ids
344assert 2 in ids
345assert 3 in ids
346assert 0xff in ids
347
348= UDS_RDBISelectiveEnumerator
349~ not_pypy
350
351resps = [EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x101)/Raw(b"asdfbeef1")]),
352         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x102)/Raw(b"beef2")]),
353         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x103)/Raw(b"beef3")]),
354         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x104)/Raw(b"beef3")]),
355         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x105)/Raw(b"beef3")]),
356         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x106)/Raw(b"beef3")]),
357         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x107)/Raw(b"beef3")]),
358         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x108)/Raw(b"beef3")]),
359         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x109)/Raw(b"beef3")]),
360         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x110)/Raw(b"beef3")]),
361         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x111)/Raw(b"beef3")]),
362         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x112)/Raw(b"beef3")]),
363         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x113)/Raw(b"beef3")]),
364         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x114)/Raw(b"beef3")]),
365         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x115)/Raw(b"beef3")]),
366         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x116)/Raw(b"beef3")]),
367         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x117)/Raw(b"beef3")]),
368         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x118)/Raw(b"beef3")]),
369         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x119)/Raw(b"beef3")]),
370         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x120)/Raw(b"beef3")]),
371         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x121)/Raw(b"beef3")]),
372         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x122)/Raw(b"beef3")]),
373         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x123)/Raw(b"beef3")]),
374         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x124)/Raw(b"beef3")]),
375         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x125)/Raw(b"beef3")]),
376         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x126)/Raw(b"beef3")]),
377         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x127)/Raw(b"beef3")]),
378         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x128)/Raw(b"beef3")]),
379         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x129)/Raw(b"beef3")]),
380         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x130)/Raw(b"beef3")]),
381         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x131)/Raw(b"beef3")]),
382         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x132)/Raw(b"beef3")]),
383         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x133)/Raw(b"beef3")]),
384         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x134)/Raw(b"beef3")]),
385         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x135)/Raw(b"beef35")]),
386         EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")])]
387
388es = [UDS_RDBISelectiveEnumerator]
389
390scanner = executeScannerInVirtualEnvironment(resps, es, UDS_RDBIRandomEnumerator_kwargs={"probe_start": 0, "probe_end": 0x500})
391
392assert scanner.scan_completed
393assert scanner.progress() > 0.95
394
395tc = scanner.configuration.stages[0][0]
396
397tc.show()
398
399assert len(tc.results_without_response) < 10
400if tc.results_without_response:
401    tc.show()
402
403assert len(tc.results_with_negative_response) > 100
404assert len(tc.results_with_positive_response) >= 1
405assert len(tc.scanned_states) == 1
406
407assert scanner.scan_completed
408assert scanner.progress() > 0.95
409
410tc = scanner.configuration.stages[0][1]
411
412tc.show()
413
414assert len(tc.results_without_response) < 10
415if tc.results_without_response:
416    tc.show()
417
418assert len(tc.results_with_negative_response) == 29
419assert len(tc.results_with_positive_response) == 35
420assert len(tc.scanned_states) == 1
421
422result = tc.show(dump=True)
423
424assert "asdfbeef1" in result
425assert "beef2" in result
426assert "beef3" in result
427assert "beef35" in result
428assert "subFunctionNotSupported received" in result
429
430ids = [t.req.identifiers[0] for t in tc.results_with_positive_response]
431
432assert 0x101 in ids
433assert 0x102 in ids
434assert 0x103 in ids
435assert 0x135 in ids
436
437= UDS_WDBIEnumerator
438
439def wdbi_handler(resp, req):
440    if req.service != 0x2E:
441        return False
442    assert req.dataIdentifier in [1, 2, 3, 0xff]
443    resp.dataIdentifier = req.dataIdentifier
444    if req.dataIdentifier == 1:
445        assert req.load == b'asdfbeef1'
446        return True
447    if req.dataIdentifier == 2:
448        assert req.load == b'beef2'
449        return True
450    if req.dataIdentifier == 3:
451        assert req.load == b"beef3"
452        return True
453    if req.dataIdentifier == 0xff:
454        assert req.load == b"beefff"
455        return True
456    return False
457
458resps = [EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=1)/Raw(b"asdfbeef1")]),
459         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=2)/Raw(b"beef2")]),
460         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=3)/Raw(b"beef3")]),
461         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0xff)/Raw(b"beefff")]),
462         EcuResponse(None, [UDS()/UDS_WDBIPR()], answers=wdbi_handler),
463         EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")])]
464
465es = [UDS_WDBISelectiveEnumerator()]
466
467scanner = executeScannerInVirtualEnvironment(
468    resps, es, UDS_RDBIEnumerator_kwargs={"scan_range": range(0x100)})
469
470assert scanner.scan_completed
471assert scanner.progress() > 0.95
472
473tc = scanner.configuration.stages[0][0]
474
475assert len(tc.results_without_response) < 10
476if tc.results_without_response:
477    tc.show()
478
479assert len(tc.results_with_negative_response) == 256 - 4
480assert len(tc.results_with_positive_response) == 4
481assert len(tc.scanned_states) == 1
482
483result = tc.show(dump=True)
484
485assert "asdfbeef1" in result
486assert "beef2" in result
487assert "beef3" in result
488assert "beefff" in result
489assert "subFunctionNotSupported received" in result
490
491ids = [t.req.identifiers[0] for t in tc.results_with_positive_response]
492
493assert 1 in ids
494assert 2 in ids
495assert 3 in ids
496assert 0xff in ids
497
498######################### WDBI #############################
499tc = scanner.configuration.stages[0][1]
500
501assert len(tc.results_without_response) < 10
502if tc.results_without_response:
503    tc.show()
504
505assert len(tc.results_with_negative_response) == 0
506assert len(tc.results_with_positive_response) == 4
507assert len(tc.scanned_states) == 1
508
509result = tc.show(dump=True)
510
511ids = [t.req.dataIdentifier for t in tc.results_with_positive_response]
512
513assert 1 in ids
514assert 2 in ids
515assert 3 in ids
516assert 0xff in ids
517
518= UDS_WDBIEnumerator 2
519
520def wdbi_handler(resp, req):
521    if req.service != 0x2E:
522        return False
523    assert req.dataIdentifier in [1, 2, 3, 0xff]
524    resp.dataIdentifier = req.dataIdentifier
525    if req.dataIdentifier == 1:
526        assert req.load == b'asdfbeef1'
527        return True
528    if req.dataIdentifier == 2:
529        assert req.load == b'beef2'
530        return True
531    if req.dataIdentifier == 3:
532        assert req.load == b"beef3"
533        return True
534    if req.dataIdentifier == 0xff:
535        assert req.load == b"beefff"
536        return True
537    return False
538
539resps = [EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=1)/Raw(b"asdfbeef1")]),
540         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=2)/Raw(b"beef2")]),
541         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=3)/Raw(b"beef3")]),
542         EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0xff)/Raw(b"beefff")]),
543         EcuResponse(None, [UDS()/UDS_WDBIPR()], answers=wdbi_handler),
544         EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")])]
545
546es = [UDS_WDBISelectiveEnumerator]
547
548scanner = executeScannerInVirtualEnvironment(
549    resps, es, UDS_RDBIEnumerator_kwargs={"scan_range": range(0x100)})
550
551assert scanner.scan_completed
552assert scanner.progress() > 0.95
553
554tc = scanner.configuration.test_cases[0][0]
555
556assert len(tc.results_without_response) < 10
557if tc.results_without_response:
558    tc.show()
559
560assert len(tc.results_with_negative_response) == 256 - 4
561assert len(tc.results_with_positive_response) == 4
562assert len(tc.scanned_states) == 1
563
564result = tc.show(dump=True)
565
566assert "asdfbeef1" in result
567assert "beef2" in result
568assert "beef3" in result
569assert "beefff" in result
570assert "subFunctionNotSupported received" in result
571
572ids = [t.req.identifiers[0] for t in tc.results_with_positive_response]
573
574assert 1 in ids
575assert 2 in ids
576assert 3 in ids
577assert 0xff in ids
578
579######################### WDBI #############################
580tc = scanner.configuration.test_cases[0][1]
581
582assert len(tc.results_without_response) < 10
583if tc.results_without_response:
584    tc.show()
585
586assert len(tc.results_with_negative_response) == 0
587assert len(tc.results_with_positive_response) == 4
588assert len(tc.scanned_states) == 1
589
590result = tc.show(dump=True)
591
592ids = [t.req.dataIdentifier for t in tc.results_with_positive_response]
593
594assert 1 in ids
595assert 2 in ids
596assert 3 in ids
597assert 0xff in ids
598
599
600= UDS_TPEnumerator
601
602resps = [EcuResponse(None, [UDS()/UDS_TPPR()]),
603         EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="serviceNotSupported", requestServiceId="TesterPresent")])]
604
605es = [UDS_TPEnumerator]
606
607scanner = executeScannerInVirtualEnvironment(resps, es)
608
609assert scanner.scan_completed
610assert scanner.progress() > 0.95
611
612tc = scanner.configuration.test_cases[0]
613
614assert len(tc.results_without_response) < 10
615if tc.results_without_response:
616    tc.show()
617
618assert len(tc.results_with_negative_response) == 0
619assert len(tc.results_with_positive_response) == 2
620assert len(tc.scanned_states) == 2
621
622assert tc.show(dump=True)
623
624= UDS_EREnumerator
625
626resps = [EcuResponse(None, [UDS()/UDS_ERPR(resetType=1)]),
627         EcuResponse(None, [UDS()/UDS_ERPR(resetType=3)]),
628         EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ECUReset")])]
629
630es = [UDS_EREnumerator]
631
632scanner = executeScannerInVirtualEnvironment(resps, es)
633
634assert scanner.scan_completed
635assert scanner.progress() > 0.95
636
637tc = scanner.configuration.test_cases[0]
638
639assert len(tc.results_without_response) < 10
640if tc.results_without_response:
641    tc.show()
642
643assert len(tc.results_with_negative_response) == 256 - 2
644assert len(tc.results_with_positive_response) == 2
645assert len(tc.scanned_states) == 1
646
647result = tc.show(dump=True)
648
649assert "hardReset" in result
650assert "softReset" in result
651assert "subFunctionNotSupported received" in result
652
653ids = [t.req.resetType for t in tc.results_with_positive_response]
654
655assert 1 in ids
656assert 3 in ids
657
658
659= UDS_CCEnumerator
660
661resps = [EcuResponse(None, [UDS()/UDS_CCPR(controlType=1)]),
662         EcuResponse(None, [UDS()/UDS_CCPR(controlType=3)]),
663         EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="CommunicationControl")])]
664
665es = [UDS_CCEnumerator]
666
667scanner = executeScannerInVirtualEnvironment(resps, es, inter=0.001)
668
669assert scanner.scan_completed
670assert scanner.progress() > 0.95
671
672tc = scanner.configuration.test_cases[0]
673
674assert len(tc.results_without_response) < 10
675if tc.results_without_response:
676    tc.show()
677
678assert len(tc.results_with_negative_response) == 256 - 2
679assert len(tc.results_with_positive_response) == 2
680assert len(tc.scanned_states) == 1
681
682result = tc.show(dump=True)
683
684assert "enableRxAndDisableTx" in result
685assert "disableRxAndTx" in result
686assert "subFunctionNotSupported received" in result
687
688ids = [t.req.controlType for t in tc.results_with_positive_response]
689
690assert 1 in ids
691assert 3 in ids
692
693= UDS_RDBPIEnumerator
694
695UDS_RDBPI.periodicDataIdentifiers[1] = "identifierElectric"
696UDS_RDBPI.periodicDataIdentifiers[3] = "identifierGas"
697
698resps = [EcuResponse(None, [UDS()/UDS_RDBPIPR(periodicDataIdentifier=1, dataRecord=b'electric')]),
699         EcuResponse(None, [UDS()/UDS_RDBPIPR(periodicDataIdentifier=3, dataRecord=b'gas')]),
700         EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataPeriodicIdentifier")])]
701
702es = [UDS_RDBPIEnumerator]
703
704scanner = executeScannerInVirtualEnvironment(resps, es)
705
706assert scanner.scan_completed
707assert scanner.progress() > 0.95
708
709tc = scanner.configuration.test_cases[0]
710
711assert len(tc.results_without_response) < 10
712if tc.results_without_response:
713    tc.show()
714
715assert len(tc.results_with_negative_response) == 256 - 2
716assert len(tc.results_with_positive_response) == 2
717assert len(tc.scanned_states) == 1
718
719result = tc.show(dump=True)
720
721assert "electric" in result
722assert "gas" in result
723assert "0x01 identifierElectric" in result
724assert "0x03 identifierGas" in result
725assert "subFunctionNotSupported received" in result
726
727ids = [t.req.periodicDataIdentifier for t in tc.results_with_positive_response]
728
729assert 1 in ids
730assert 3 in ids
731
732
733= UDS_RCEnumerator
734
735resps = [EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=1, routineIdentifier=1)/Raw(b"asdfbeef1")]),
736         EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=2, routineIdentifier=2)/Raw(b"beef2")]),
737         EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=3, routineIdentifier=3)/Raw(b"beef3")]),
738         EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=1, routineIdentifier=0x10)/Raw(b"beefff")]),
739         EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="RoutineControl")])]
740
741es = [UDS_RCEnumerator]
742
743scanner = executeScannerInVirtualEnvironment(resps, es, UDS_RCEnumerator_kwargs={"scan_range": range(0x11)})
744
745assert scanner.scan_completed
746assert scanner.progress() > 0.95
747
748tc = scanner.configuration.test_cases[0]
749
750assert len(tc.results_without_response) < 10
751if tc.results_without_response:
752    tc.show()
753
754assert len(tc.results_with_negative_response) == 0x11 * 3 - 4
755assert len(tc.results_with_positive_response) == 4
756assert len(tc.scanned_states) == 1
757
758result = tc.show(dump=True)
759
760assert "subFunctionNotSupported received" in result
761
762ids = [t.req.routineIdentifier for t in tc.results_with_positive_response]
763
764assert 1 in ids
765assert 2 in ids
766assert 3 in ids
767assert 0x10 in ids
768
769
770= UDS_RCSelectiveEnumerator
771
772resps = [EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=1, routineIdentifier=1)/Raw(b"asdfbeef1")]),
773         EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=2, routineIdentifier=1)/Raw(b"beef2")]),
774         EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=3, routineIdentifier=1)/Raw(b"beef3")]),
775         EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=1, routineIdentifier=0x10)/Raw(b"beefff")]),
776         EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="RoutineControl")])]
777
778es = [UDS_RCSelectiveEnumerator]
779
780scanner = executeScannerInVirtualEnvironment(resps, es, UDS_RCStartEnumerator_kwargs={"scan_range": range(0x11)})
781
782assert scanner.scan_completed
783assert scanner.progress() > 0.95
784
785tc = scanner.configuration.stages[0][0]
786
787assert len(tc.results_without_response) < 10
788if tc.results_without_response:
789    tc.show()
790
791assert len(tc.results_with_negative_response) == 0x11 - 2
792assert len(tc.results_with_positive_response) == 2
793assert len(tc.scanned_states) == 1
794
795result = tc.show(dump=True)
796
797assert "subFunctionNotSupported received" in result
798
799ids = [t.req.routineIdentifier for t in tc.results_with_positive_response]
800
801assert 1 in ids
802assert 0x10 in ids
803
804tc = scanner.configuration.stages[0][1]
805
806assert len(tc.results_without_response) < 10
807if tc.results_without_response:
808    tc.show()
809
810assert len(tc.results_with_negative_response) == 538
811assert len(tc.results_with_positive_response) == 2
812assert len(tc.scanned_states) == 1
813
814result = tc.show(dump=True)
815
816assert "subFunctionNotSupported received" in result
817
818ids = [t.req.routineIdentifier for t in tc.results_with_positive_response]
819
820assert 1 in ids
821
822= UDS_IOCBIEnumerator
823
824resps = [EcuResponse(None, [UDS()/UDS_IOCBIPR(dataIdentifier=1)/Raw(b"asdfbeef1")]),
825         EcuResponse(None, [UDS()/UDS_IOCBIPR(dataIdentifier=2)/Raw(b"beef2")]),
826         EcuResponse(None, [UDS()/UDS_IOCBIPR(dataIdentifier=3)/Raw(b"beef3")]),
827         EcuResponse(None, [UDS()/UDS_IOCBIPR(dataIdentifier=0xff)/Raw(b"beefff")]),
828         EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="InputOutputControlByIdentifier")])]
829
830es = [UDS_IOCBIEnumerator]
831
832scanner = executeScannerInVirtualEnvironment(resps, es, UDS_IOCBIEnumerator_kwargs={"scan_range": range(0x100)})
833
834assert scanner.scan_completed
835assert scanner.progress() > 0.95
836
837tc = scanner.configuration.test_cases[0]
838
839assert len(tc.results_without_response) < 10
840if tc.results_without_response:
841    tc.show()
842
843assert len(tc.results_with_negative_response) == 0x100 - 4
844assert len(tc.results_with_positive_response) == 4
845assert len(tc.scanned_states) == 1
846
847result = tc.show(dump=True)
848
849assert "asdfbeef1" in result
850assert "beef2" in result
851assert "beef3" in result
852assert "beefff" in result
853assert "subFunctionNotSupported received" in result
854
855ids = [t.req.dataIdentifier for t in tc.results_with_positive_response]
856
857assert 1 in ids
858assert 2 in ids
859assert 3 in ids
860assert 0xff in ids
861
862
863= UDS_RDEnumerator
864
865memory = dict()
866
867for addr in itertools.chain(range(0x1f00), range(0xd000, 0xfff2), range(0xa000, 0xcf00), range(0x2000, 0x5f00)):
868    memory[addr] = addr & 0xff
869
870def answers_rd(resp, req):
871    global memory
872    if req.service != 0x34:
873        return False
874    if req.memorySizeLen in [1, 3, 4]:
875        return False
876    if req.memoryAddressLen in [1, 3, 4]:
877        return False
878    addr = getattr(req, "memoryAddress%d" % req.memoryAddressLen)
879    if addr not in memory.keys():
880        return False
881    resp.memorySizeLen = req.memorySizeLen
882    return True
883
884resps = [EcuResponse(None, [UDS()/UDS_RDPR()], answers=answers_rd),
885         EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="requestOutOfRange", requestServiceId="RequestDownload")])]
886
887#######################################################
888scanner = executeScannerInVirtualEnvironment(
889    resps, [UDS_RDEnumerator], unstable_socket=False,
890    UDS_RDEnumerator_kwargs={"unittest": True})
891
892assert scanner.scan_completed
893assert scanner.progress() > 0.95
894
895tc1 = scanner.configuration.test_cases[0]
896
897assert len(tc1.results_without_response) < 10
898if len(tc1.results_without_response):
899    tc1.show()
900
901assert len(tc1.results_with_negative_response) > 400
902assert len(tc1.results_with_positive_response) > 40
903assert len(tc1.scanned_states) == 1
904
905result = tc1.show(dump=True)
906
907assert "requestOutOfRange received " in result
908
909= UDS_RMBARandomEnumerator
910
911pkt = UDS_RMBARandomEnumerator._random_memory_addr_pkt(4, 4, 10)
912
913assert pkt.memorySizeLen == 4
914assert pkt.memoryAddressLen == 4
915assert pkt.memorySize4 == 10
916assert pkt.memoryAddress4 is not None
917
918pkt = UDS_RMBARandomEnumerator._random_memory_addr_pkt()
919
920assert pkt.memorySizeLen in [1, 2, 3, 4]
921assert pkt.memoryAddressLen in [1, 2, 3, 4]
922
923pkt2 = UDS_RMBARandomEnumerator._random_memory_addr_pkt()
924
925assert pkt != pkt2
926
927
928= UDS_RMBAEnumerator
929~ linux not_pypy
930
931memory = dict()
932
933mem_areas = [(0x100, 0x1f00), (0xd000, 0xff00), (0xa000, 0xc000), (0x3000, 0x5f00)]
934
935mem_ranges = [range(s, e) for s, e in mem_areas]
936
937mem_inner_borders = [s for s, _ in mem_areas]
938mem_inner_borders += [e - 1 for _, e in mem_areas]
939
940mem_outer_borders = [s - 1 for s, _ in mem_areas]
941mem_outer_borders += [e for _, e in mem_areas]
942
943mem_random_test_points = []
944for _ in range(100):
945    mem_random_test_points += [random.choice(list(itertools.chain(*mem_ranges)))]
946
947for addr in itertools.chain(*mem_ranges):
948    memory[addr] = addr & 0xff
949
950def answers_rmba(resp, req):
951    global memory
952    if req.service != 0x23:
953        return False
954    if req.memorySizeLen in [1, 3, 4]:
955        return False
956    if req.memoryAddressLen in [1, 3, 4]:
957        return False
958    addr = getattr(req, "memoryAddress%d" % req.memoryAddressLen)
959    if addr not in memory.keys():
960        return False
961    out_mem = list()
962    size = getattr(req, "memorySize%d" % req.memorySizeLen)
963    for i in range(addr, addr + size):
964        try:
965            out_mem.append(memory[i])
966        except KeyError:
967            pass
968    resp.dataRecord = bytes(out_mem)
969    return True
970
971resps = [EcuResponse(None, [UDS()/UDS_RMBAPR(dataRecord=b'')], answers=answers_rmba),
972         EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="requestOutOfRange", requestServiceId="ReadMemoryByAddress")])]
973
974#######################################################
975scanner = executeScannerInVirtualEnvironment(
976    resps, [UDS_RMBAEnumerator], unstable_socket=False,
977    UDS_RMBARandomEnumerator_kwargs={"unittest": True})
978
979assert scanner.scan_completed
980tc1 = scanner.configuration.stages[0][1]
981
982assert len(tc1.results_without_response) < 30
983if len(tc1.results_without_response):
984    tc1.show()
985
986assert len(tc1.results_with_negative_response) > 10
987assert len(tc1.results_with_positive_response) > 300
988assert len(tc1.scanned_states) == 1
989
990result = tc1.show(dump=True)
991
992assert "requestOutOfRange received " in result
993
994############################################################
995
996addrs = tc1._get_memory_addresses_from_results(tc1.results_with_positive_response)
997
998print(float([tp in addrs for tp in mem_inner_borders].count(True)) / len(mem_inner_borders))
999assert float([tp in addrs for tp in mem_inner_borders].count(True)) / len(mem_inner_borders) > 0.8
1000print(float([tp in addrs for tp in mem_random_test_points].count(True)) / len(mem_random_test_points))
1001assert float([tp in addrs for tp in mem_random_test_points].count(True)) / len(mem_random_test_points) > 0.8
1002print(float([tp not in addrs for tp in mem_outer_borders].count(True)) / len(mem_outer_borders))
1003assert float([tp not in addrs for tp in mem_outer_borders].count(True)) / len(mem_outer_borders) > 0.7
1004
1005
1006= UDS_TDEnumerator
1007
1008resps = [EcuResponse(None, [UDS()/UDS_TDPR(blockSequenceCounter=1)]),
1009         EcuResponse(None, [UDS()/UDS_TDPR(blockSequenceCounter=3)]),
1010         EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="TransferData")])]
1011
1012es = [UDS_TDEnumerator]
1013
1014scanner = executeScannerInVirtualEnvironment(resps, es)
1015
1016assert scanner.scan_completed
1017assert scanner.progress() > 0.95
1018
1019tc = scanner.configuration.test_cases[0]
1020
1021assert len(tc.results_without_response) < 10
1022if tc.results_without_response:
1023    tc.show()
1024
1025assert len(tc.results_with_negative_response) == 256 - 2
1026assert len(tc.results_with_positive_response) == 2
1027assert len(tc.scanned_states) == 1
1028
1029result = tc.show(dump=True)
1030
1031assert "subFunctionNotSupported received" in result
1032
1033ids = [t.req.blockSequenceCounter for t in tc.results_with_positive_response]
1034
1035assert 1 in ids
1036assert 3 in ids
1037
1038= BMW_DevJobEnumerator
1039
1040load_contrib("automotive.bmw.definitions")
1041load_contrib("automotive.bmw.enumerator")
1042
1043resps = [EcuResponse(None, [UDS()/DEV_JOB_PR(identifier=0xff00)/Raw(b"asdfbeef1")]),
1044         EcuResponse(None, [UDS()/DEV_JOB_PR(identifier=0xff02)/Raw(b"beef2")]),
1045         EcuResponse(None, [UDS()/DEV_JOB_PR(identifier=0xff03)/Raw(b"beef3")]),
1046         EcuResponse(None, [UDS()/DEV_JOB_PR(identifier=0xffff)/Raw(b"beefff")]),
1047         EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="DevelopmentJob")])]
1048
1049es = [BMW_DevJobEnumerator]
1050
1051scanner = executeScannerInVirtualEnvironment(resps, es, BMW_DevJobEnumerator_kwargs={"scan_range": range(0xFF00, 0x10000)})
1052
1053assert scanner.scan_completed
1054assert scanner.progress() > 0.95
1055
1056tc = scanner.configuration.test_cases[0]
1057
1058assert len(tc.results_without_response) < 10
1059if tc.results_without_response:
1060    tc.show()
1061
1062assert len(tc.results_with_negative_response) == 0x100 - 4
1063assert len(tc.results_with_positive_response) == 4
1064assert len(tc.scanned_states) == 1
1065
1066result = tc.show(dump=True)
1067
1068assert "ReadTransportMessageStatus" in result
1069assert "65282" in result
1070assert "65283" in result
1071assert "ReadMemory" in result
1072assert "subFunctionNotSupported received" in result
1073assert "PR: Supported" in result
1074
1075ids = [t.req.identifier for t in tc.results_with_positive_response]
1076
1077assert 0xff00 in ids
1078assert 0xff02 in ids
1079assert 0xff03 in ids
1080assert 0xffff in ids
1081
1082= UDS_ServiceEnumerator weird issue
1083
1084resps = [EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode=0x13, requestServiceId=0x40)]),
1085         EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId=0x41)]),
1086         EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId=0x11)]),
1087         EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId=0x42)]),
1088         EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId=0x43)])]
1089
1090es = [UDS_ServiceEnumerator]
1091
1092scanner = executeScannerInVirtualEnvironment(
1093    resps, es, UDS_ServiceEnumerator_kwargs={"scan_range": [0x11, 0x40, 0x41, 0x42]})
1094
1095assert scanner.scan_completed
1096assert scanner.progress() > 0.95
1097tc = scanner.configuration.test_cases[0]
1098tc.show()
1099
1100assert len(tc.results_with_negative_response) == 4
1101
1102= UDS_ServiceEnumerator, all range
1103
1104def req_handler(resp, req):
1105    if req.service != 0x22:
1106        return False
1107    if len(req) == 1:
1108        resp.negativeResponseCode="generalReject"
1109        return True
1110    if len(req) == 2:
1111        resp.negativeResponseCode="incorrectMessageLengthOrInvalidFormat"
1112        return True
1113    if len(req) == 3:
1114        resp.negativeResponseCode="requestOutOfRange"
1115        return True
1116    return False
1117
1118resps = [EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")], req_handler)]
1119
1120es = [UDS_ServiceEnumerator]
1121
1122debug_dissector_backup = conf.debug_dissector
1123
1124# This Enumerator is sending corrupted Packets, therefore we need to disable the debug_dissector
1125conf.debug_dissector = False
1126scanner = executeScannerInVirtualEnvironment(
1127    resps, es, UDS_ServiceEnumerator_kwargs={"request_length": 3, "scan_range": range(256)}, unstable_socket=False)
1128conf.debug_dissector = debug_dissector_backup
1129
1130assert scanner.scan_completed
1131assert scanner.progress() > 0.95
1132tc = scanner.configuration.test_cases[0]
1133
1134assert len(tc.results_without_response) < 10
1135if tc.results_without_response:
1136    tc.show()
1137
1138tc.show()
1139
1140assert len(tc.scanned_states) == 1
1141
1142result = tc.show(dump=True)
1143
1144assert "incorrectMessageLengthOrInvalidFormat" in result
1145assert "requestOutOfRange" in result
1146
1147+ Cleanup
1148
1149= Delete testsockets
1150
1151
1152cleanup_testsockets()
1153