• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1% Regression tests for GMLAN Scanners
2~ scanner
3
4+ Configuration
5~ conf
6
7= Imports
8
9import itertools
10import logging
11import threading
12import time
13from scapy.contrib.isotp import ISOTPMessageBuilder
14
15from test.testsocket import TestSocket, cleanup_testsockets, open_test_sockets
16
17
18############
19############
20+ Load general modules
21
22= Load contribution layer
23
24from scapy.contrib.automotive.gm.gmlan import *
25conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4
26from scapy.contrib.automotive.gm.gmlan_scanner import *
27from scapy.contrib.automotive.ecu import *
28load_layer("can")
29
30log_automotive.setLevel(logging.DEBUG)
31
32
33= Define Testfunction
34
35def executeScannerInVirtualEnvironment(supported_responses, enumerators, **kwargs):
36    tester = TestSocket(GMLAN)
37    ecu = TestSocket(GMLAN)
38    tester.pair(ecu)
39    answering_machine = EcuAnsweringMachine(supported_responses=supported_responses, main_socket=ecu, basecls=GMLAN, verbose=False)
40    def reset():
41        answering_machine.reset_state()
42        sniff(timeout=0.001, opened_socket=[ecu, tester])
43    sim = threading.Thread(target=answering_machine, kwargs={
44        "timeout": 30, "stop_filter": lambda x: bytes(x) == b"\xff\xff\xff"})
45    sim.start()
46    try:
47        scanner = GMLAN_Scanner(
48            tester, reset_handler=reset,
49            test_cases=enumerators, timeout=0.2, retry_if_none_received=True,
50            unittest=True, **kwargs)
51        def scanner_thread():
52            for i in range(3):
53                print("Starting scan")
54                scanner.scan(timeout=10)
55                if scanner.scan_completed:
56                    print("Scan completed after %d iterations" % i)
57                    return
58        scanner_t = threading.Thread(target=scanner_thread)
59        scanner_t.start()
60        scanner_t.join(timeout=120)
61        if scanner_t.is_alive():
62            scanner.stop_scan()
63    finally:
64        tester.send(Raw(b"\xff\xff\xff"))
65        sim.join(timeout=2)
66        assert not sim.is_alive()
67        cleanup_testsockets()
68    return scanner
69
70= Load packets from candump
71
72conf.contribs['CAN']['swap-bytes'] = True
73pkts = rdpcap(scapy_path("test/pcaps/candump_gmlan_scanner.pcap.gz"))
74assert len(pkts)
75
76= Create GMLAN messages from packets
77
78builder = ISOTPMessageBuilder(basecls=GMLAN, use_ext_address=False, rx_id=[0x241, 0x641])
79msgs = list()
80
81for p in pkts:
82    if p.data == b"ECURESET":
83        msgs.append(p)
84    else:
85        builder.feed(p)
86        if len(builder):
87            msgs.append(builder.pop())
88
89assert len(msgs)
90
91= Create ECU-Clone from packets
92
93mEcu = Ecu(logging=False, verbose=False, store_supported_responses=True)
94
95for p in msgs:
96    if isinstance(p, CAN) and p.data == b"ECURESET":
97        mEcu.reset()
98    else:
99        mEcu.update(p)
100
101assert len(mEcu.supported_responses)
102
103= Test GMLAN_SAEnumerator evaluate_response
104
105e = GMLAN_SAEnumerator()
106
107config = {}
108
109s = EcuState(session=1)
110
111debug_dissector_backup = conf.debug_dissector
112
113# This tests involves corrupted Packets, therefore we need to disable the debug_dissector
114conf.debug_dissector = False
115
116assert False == e._evaluate_response(s, GMLAN(b"\x27\x01"), None, **config)
117config = {"exit_if_service_not_supported": True}
118assert not e._retry_pkt[s]
119assert True == e._evaluate_response(s, GMLAN(b"\x27\x01"), GMLAN(b"\x7f\x27\x11"), **config)
120assert not e._retry_pkt[s]
121assert True == e._evaluate_response(s, GMLAN(b"\x27\x01"), GMLAN(b"\x7f\x27\x22"), **config)
122assert e._retry_pkt[s] == GMLAN(b"\x27\x01")
123assert False == e._evaluate_response(s, GMLAN(b"\x27\x02"), GMLAN(b"\x7f\x27\x22"), **config)
124assert not e._retry_pkt[s]
125assert True == e._evaluate_response(s, GMLAN(b"\x27\x01"), GMLAN(b"\x7f\x27\x37"), **config)
126assert e._retry_pkt[s] == GMLAN(b"\x27\x01")
127assert False == e._evaluate_response(s, GMLAN(b"\x27\x01"), GMLAN(b"\x7f\x27\x37"), **config)
128assert not e._retry_pkt[s]
129assert True == e._evaluate_response(s, GMLAN(b"\x27\x01"), GMLAN(b"\x67\x01ab"), **config)
130assert not e._retry_pkt[s]
131assert False == e._evaluate_response(s, GMLAN(b"\x27\x01"), GMLAN(b"\x67\x02ab"), **config)
132assert not e._retry_pkt[s]
133conf.debug_dissector = debug_dissector_backup
134
135
136= Simulate ECU and run Scanner
137
138def securityAccess_Algorithm1(seed):
139    return 0x5F51
140
141keyfunction = securityAccess_Algorithm1
142
143scanner = executeScannerInVirtualEnvironment(
144    mEcu.supported_responses,
145    [GMLAN_IDOEnumerator, GMLAN_PMEnumerator, GMLAN_RDEnumerator, GMLAN_SAEnumerator],
146    GMLAN_SAEnumerator_kwargs={"keyfunction": keyfunction, "scan_range": range(2)},
147    GMLAN_PMEnumerator_kwargs={"unittest": True})
148
149assert len(scanner.state_paths) == 9
150assert scanner.scan_completed
151
152assert EcuState(session=1) in scanner.final_states
153assert EcuState(session=1, security_level=2) in scanner.final_states
154assert EcuState(session=3, tp=1) in scanner.final_states
155assert EcuState(session=2, tp=1, communication_control=1) in scanner.final_states
156assert EcuState(session=2, tp=1, communication_control=1, security_level=2) in scanner.final_states
157assert EcuState(session=3, tp=1, security_level=2) in scanner.final_states
158assert EcuState(session=2, tp=1, communication_control=1, security_level=2, request_download=1) in scanner.final_states
159
160= Simulate ECU and test GMLAN_RDBIEnumerator
161
162resps = [EcuResponse(None, [GMLAN()/GMLAN_RDBIPR(dataIdentifier=1)/Raw(b"asdfbeef1")]),
163         EcuResponse(None, [GMLAN()/GMLAN_RDBIPR(dataIdentifier=2)/Raw(b"beef2")]),
164         EcuResponse(None, [GMLAN()/GMLAN_RDBIPR(dataIdentifier=3)/Raw(b"beef3")]),
165         EcuResponse(None, [GMLAN()/GMLAN_RDBIPR(dataIdentifier=0xff)/Raw(b"beefff")]),
166         EcuResponse(None, [GMLAN()/GMLAN_NR(returnCode="SubFunctionNotSupported", requestServiceId="ReadDataByIdentifier")])]
167
168es = [GMLAN_RDBIEnumerator]
169
170scanner = executeScannerInVirtualEnvironment(resps, es)
171
172assert scanner.scan_completed
173tc = scanner.configuration.test_cases[0]
174
175assert len(tc.results_without_response) < 10
176if tc.results_without_response:
177    tc.show()
178
179
180assert len(tc.results_with_negative_response) == 256 - 4
181assert len(tc.results_with_positive_response) == 4
182assert len(tc.scanned_states) == 1
183
184result = tc.show(dump=True)
185
186assert "asdfbeef1" in result
187assert "beef2" in result
188assert "beef3" in result
189assert "beefff" in result
190assert "SubFunctionNotSupported received" in result
191
192ids = [t.req.dataIdentifier for t in tc.results_with_positive_response]
193
194assert 1 in ids
195assert 2 in ids
196assert 3 in ids
197assert 0xff in ids
198
199
200= Simulate ECU and test GMLAN_WDBIEnumerator
201
202def wdbi_handler(resp, req):
203    if req.service != 0x3b:
204        return False
205    assert req.dataIdentifier in [1, 2, 3, 0xff]
206    resp.dataIdentifier = req.dataIdentifier
207    if req.dataIdentifier == 1:
208        assert req.dataRecord == b'asdfbeef1'
209        return True
210    if req.dataIdentifier == 2:
211        assert req.dataRecord == b'beef2'
212        return True
213    if req.dataIdentifier == 3:
214        assert req.dataRecord == b"beef3"
215        return True
216    if req.dataIdentifier == 0xff:
217        assert req.dataRecord == b"beefff"
218        return True
219    return False
220
221resps = [EcuResponse(None, [GMLAN()/GMLAN_RDBIPR(dataIdentifier=1)/Raw(b"asdfbeef1")]),
222         EcuResponse(None, [GMLAN()/GMLAN_RDBIPR(dataIdentifier=2)/Raw(b"beef2")]),
223         EcuResponse(None, [GMLAN()/GMLAN_RDBIPR(dataIdentifier=3)/Raw(b"beef3")]),
224         EcuResponse(None, [GMLAN()/GMLAN_RDBIPR(dataIdentifier=0xff)/Raw(b"beefff")]),
225         EcuResponse(None, [GMLAN()/GMLAN_WDBIPR()], answers=wdbi_handler),
226         EcuResponse(None, [GMLAN()/GMLAN_NR(returnCode="SubFunctionNotSupported", requestServiceId="ReadDataByIdentifier")])]
227
228es = [GMLAN_WDBISelectiveEnumerator]
229
230scanner = executeScannerInVirtualEnvironment(resps, es)
231
232assert scanner.scan_completed
233tc = scanner.configuration.test_cases[0][0]
234
235assert len(tc.results_without_response) < 10
236if tc.results_without_response:
237    tc.show()
238
239
240assert len(tc.results_with_negative_response) == 256 - 4
241assert len(tc.results_with_positive_response) == 4
242assert len(tc.scanned_states) == 1
243
244result = tc.show(dump=True)
245
246assert "asdfbeef1" in result
247assert "beef2" in result
248assert "beef3" in result
249assert "beefff" in result
250assert "SubFunctionNotSupported received" in result
251
252ids = [t.req.dataIdentifier for t in tc.results_with_positive_response]
253
254assert 1 in ids
255assert 2 in ids
256assert 3 in ids
257assert 0xff in ids
258
259######################### WDBI #############################
260tc = scanner.configuration.test_cases[0][1]
261
262assert len(tc.results_without_response) < 10
263if tc.results_without_response:
264    tc.show()
265
266
267assert len(tc.results_with_negative_response) == 0
268assert len(tc.results_with_positive_response) == 4
269assert len(tc.scanned_states) == 1
270
271result = tc.show(dump=True)
272
273ids = [t.req.dataIdentifier for t in tc.results_with_positive_response]
274
275assert 1 in ids
276assert 2 in ids
277assert 3 in ids
278assert 0xff in ids
279
280
281= Simulate ECU and test GMLAN_RDBPIEnumerator
282
283resps = [EcuResponse(None, [GMLAN()/GMLAN_RDBPIPR(parameterIdentifier=1)/Raw(b"asdfbeef1")]),
284         EcuResponse(None, [GMLAN()/GMLAN_RDBPIPR(parameterIdentifier=2)/Raw(b"asdfbeef2")]),
285         EcuResponse(None, [GMLAN()/GMLAN_RDBPIPR(parameterIdentifier=3)/Raw(b"beef3")]),
286         EcuResponse(None, [GMLAN()/GMLAN_RDBPIPR(parameterIdentifier=0xffff)/Raw(b"beefffff")]),
287         EcuResponse(None, [GMLAN()/GMLAN_NR(returnCode="SubFunctionNotSupported", requestServiceId="ReadDataByParameterIdentifier")])]
288
289es = [GMLAN_RDBPIEnumerator]
290
291scanner = executeScannerInVirtualEnvironment(resps, es, GMLAN_RDBPIEnumerator_kwargs={"scan_range":list(range(0x100)) + list(range(0xff00, 0x10000))})
292
293assert scanner.scan_completed
294tc = scanner.configuration.test_cases[0]
295
296assert len(tc.results_without_response) < 10
297if tc.results_without_response:
298    tc.show()
299
300
301assert len(tc.results_with_negative_response) == 0x200 - 4
302assert len(tc.results_with_positive_response) == 4
303assert len(tc.scanned_states) == 1
304
305result = tc.show(dump=True)
306
307assert "asdfbeef1" in result
308assert "asdfbeef2" in result
309assert "beef3" in result
310assert "beefffff" in result
311assert "SubFunctionNotSupported received" in result
312
313ids = [t.req.identifiers[0] for t in tc.results_with_positive_response]
314
315assert 1 in ids
316assert 2 in ids
317assert 3 in ids
318assert 0xffff in ids
319
320= Simulate ECU and test GMLAN_TPEnumerator
321
322resps = [EcuResponse(None, [GMLAN(service=0x7e)])]
323
324es = [GMLAN_TPEnumerator]
325scanner = executeScannerInVirtualEnvironment(resps, es)
326
327assert scanner.scan_completed
328tc = scanner.configuration.test_cases[0]
329
330assert len(tc.results_without_response) < 10
331if tc.results_without_response:
332    tc.show()
333
334
335assert len(tc.results_with_negative_response) == 0
336assert len(tc.results_with_positive_response) == 2
337assert len(tc.scanned_states) == 2
338
339
340= Simulate ECU and test GMLAN_DCEnumerator
341
342resps = [EcuResponse(None, [GMLAN()/GMLAN_DCPR(CPIDNumber=1)]),
343         EcuResponse(None, [GMLAN()/GMLAN_DCPR(CPIDNumber=2)]),
344         EcuResponse(None, [GMLAN()/GMLAN_DCPR(CPIDNumber=3)/Raw(b"beef3")]),
345         EcuResponse(None, [GMLAN()/GMLAN_DCPR(CPIDNumber=0xff)/Raw(b"beefff")]),
346         EcuResponse(None, [GMLAN()/GMLAN_NR(returnCode="SubFunctionNotSupported", requestServiceId="DeviceControl")])]
347
348es = [GMLAN_DCEnumerator]
349scanner = executeScannerInVirtualEnvironment(resps, es)
350
351assert scanner.scan_completed
352tc = scanner.configuration.test_cases[0]
353
354assert len(tc.results_without_response) < 10
355if tc.results_without_response:
356    tc.show()
357
358
359assert len(tc.results_with_negative_response) == 256 - 4
360assert len(tc.results_with_positive_response) == 4
361assert len(tc.scanned_states) == 1
362
363ids = [t.req.CPIDNumber for t in tc.results_with_positive_response]
364
365assert 1 in ids
366assert 2 in ids
367assert 3 in ids
368assert 255 in ids
369
370result = tc.show(dump=True)
371
372assert "SubFunctionNotSupported received " in result
373
374= Simulate ECU and test GMLAN_TDEnumerator
375
376conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4
377
378positive_responses_left = 4
379
380def answers_td(resp, req):
381    global positive_responses_left
382    if req.service != 0x36:
383        return False
384    if not positive_responses_left:
385        return False
386    positive_responses_left -= 1
387    resp.service = 0x76
388    return True
389
390resps = [EcuResponse(None, [GMLAN(service="TransferDataPositiveResponse")], answers=answers_td),
391         EcuResponse(None, [GMLAN()/GMLAN_NR(returnCode="RequestOutOfRange", requestServiceId="TransferData")])]
392
393es = [GMLAN_TDEnumerator]
394scanner = executeScannerInVirtualEnvironment(resps, es)
395
396assert scanner.scan_completed
397tc = scanner.configuration.test_cases[0]
398
399assert len(tc.results_without_response) < 10
400if tc.results_without_response:
401    tc.show()
402
403
404assert len(tc.results_with_negative_response) == 0x1ff - 4
405assert len(tc.results_with_positive_response) == 4
406assert len(tc.scanned_states) == 1
407
408result = tc.show(dump=True)
409
410assert "RequestOutOfRange received " in result
411
412= Simulate ECU and test GMLAN_RMBAEnumerator 1
413~ not_pypy
414
415conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 2
416
417memory = dict()
418
419mem_areas = [(0x100, 0x1f00), (0xd000, 0xff00), (0xa000, 0xc000), (0x3000, 0x5f00)]
420
421mem_ranges = [range(s, e) for s, e in mem_areas]
422
423mem_inner_borders = [s for s, _ in mem_areas]
424mem_inner_borders += [e - 1 for _, e in mem_areas]
425
426mem_outer_borders = [s - 1 for s, _ in mem_areas]
427mem_outer_borders += [e for _, e in mem_areas]
428
429mem_random_test_points = []
430for _ in range(100):
431    mem_random_test_points += [random.choice(list(itertools.chain(*mem_ranges)))]
432
433for addr in itertools.chain(*mem_ranges):
434    memory[addr] = addr & 0xff
435
436def answers_rmba(resp, req):
437    global memory
438    if req.service != 0x23:
439        return False
440    if req.memoryAddress not in memory.keys():
441        return False
442    out_mem = list()
443    for i in range(req.memoryAddress, req.memoryAddress + req.memorySize):
444        try:
445            out_mem.append(memory[i])
446        except KeyError:
447            pass
448    resp.memoryAddress = req.memoryAddress
449    resp.dataRecord = bytes(out_mem)
450    return True
451
452resps = [EcuResponse(None, [GMLAN()/GMLAN_RMBAPR(memoryAddress=0, dataRecord=b'')], answers=answers_rmba),
453         EcuResponse(None, [GMLAN()/GMLAN_NR(returnCode="RequestOutOfRange", requestServiceId="ReadMemoryByAddress")])]
454
455#######################################################
456scanner = executeScannerInVirtualEnvironment(resps, [GMLAN_RMBAEnumerator])
457
458assert scanner.scan_completed
459tc1 = scanner.configuration.test_cases[0]
460
461assert len(tc1.results_without_response) < 10
462assert len(tc1.results_with_negative_response) > 10
463assert len(tc1.results_with_positive_response) > 50
464assert len(tc1.scanned_states) == 1
465
466result = tc1.show(dump=True)
467
468assert "RequestOutOfRange received " in result
469
470
471def _get_memory_addresses_from_results(results):
472    mem_areas = [
473        range(tup.req.memoryAddress, tup.req.memoryAddress + tup.req.memorySize)
474        for tup in results]
475    return set(list(itertools.chain.from_iterable(mem_areas)))
476
477############################################################
478
479addrs = _get_memory_addresses_from_results(tc1.results_with_positive_response)
480
481print([tp in addrs for tp in mem_inner_borders].count(True) / len(mem_inner_borders))
482assert [tp in addrs for tp in mem_inner_borders].count(True) / len(mem_inner_borders) > 0.8
483print([tp in addrs for tp in mem_random_test_points].count(True) / len(mem_random_test_points))
484assert [tp in addrs for tp in mem_random_test_points].count(True) / len(mem_random_test_points) > 0.8
485print([tp not in addrs for tp in mem_outer_borders].count(True) / len(mem_outer_borders))
486assert [tp not in addrs for tp in mem_outer_borders].count(True) / len(mem_outer_borders) > 0.8
487
488
489= Simulate ECU and test GMLAN_RMBAEnumerator 2
490* This test takes very long to execute
491
492~ disabled
493
494conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 3
495
496memory = dict()
497
498for addr in itertools.chain(range(0x10000), range(0xf00000, 0xf0f000)):
499    memory[addr] = addr & 0xff
500
501resps = [EcuResponse(None, [GMLAN()/GMLAN_RMBAPR(memoryAddress=0, dataRecord=b'')], answers=answers_rmba),
502         EcuResponse(None, [GMLAN()/GMLAN_NR(returnCode="RequestOutOfRange", requestServiceId="ReadMemoryByAddress")])]
503
504scanner = executeScannerInVirtualEnvironment(resps, [GMLAN_RMBAEnumerator])
505
506assert scanner.scan_completed
507tc = scanner.configuration.test_cases[0]
508
509assert len(tc.results_without_response) < 10
510if tc.results_without_response:
511    tc.show()
512
513
514assert len(tc.results_with_negative_response) > 350
515assert len(tc.results_with_positive_response) > 50
516assert len(tc.scanned_states) == 1
517
518addrs = [t.req.memoryAddress for t in tc.results_with_positive_response]
519
520assert 0 in addrs
521assert 0x10 in addrs
522assert 0xf0 in addrs
523assert 0x3000 in addrs
524assert 0x3090 in addrs
525assert 0xa100 in addrs
526assert 0xa1f0 in addrs
527assert 0xa200 in addrs
528assert 0xa2f0 in addrs
529assert 0xf000 in addrs
530assert 0xf0f0 in addrs
531
532result = tc.show(dump=True)
533
534assert "RequestOutOfRange received " in result
535
536+ Cleanup
537
538= Delete TestSockets
539
540cleanup_testsockets()
541