% Regression tests for GMLAN Scanners ~ scanner + Configuration ~ conf = Imports import itertools import logging import threading import time from scapy.contrib.isotp import ISOTPMessageBuilder from test.testsocket import TestSocket, cleanup_testsockets, open_test_sockets ############ ############ + Load general modules = Load contribution layer from scapy.contrib.automotive.gm.gmlan import * conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 from scapy.contrib.automotive.gm.gmlan_scanner import * from scapy.contrib.automotive.ecu import * load_layer("can") log_automotive.setLevel(logging.DEBUG) = Define Testfunction def executeScannerInVirtualEnvironment(supported_responses, enumerators, **kwargs): tester = TestSocket(GMLAN) ecu = TestSocket(GMLAN) tester.pair(ecu) answering_machine = EcuAnsweringMachine(supported_responses=supported_responses, main_socket=ecu, basecls=GMLAN, verbose=False) def reset(): answering_machine.reset_state() sniff(timeout=0.001, opened_socket=[ecu, tester]) sim = threading.Thread(target=answering_machine, kwargs={ "timeout": 30, "stop_filter": lambda x: bytes(x) == b"\xff\xff\xff"}) sim.start() try: scanner = GMLAN_Scanner( tester, reset_handler=reset, test_cases=enumerators, timeout=0.2, retry_if_none_received=True, unittest=True, **kwargs) def scanner_thread(): for i in range(3): print("Starting scan") scanner.scan(timeout=10) if scanner.scan_completed: print("Scan completed after %d iterations" % i) return scanner_t = threading.Thread(target=scanner_thread) scanner_t.start() scanner_t.join(timeout=120) if scanner_t.is_alive(): scanner.stop_scan() finally: tester.send(Raw(b"\xff\xff\xff")) sim.join(timeout=2) assert not sim.is_alive() cleanup_testsockets() return scanner = Load packets from candump conf.contribs['CAN']['swap-bytes'] = True pkts = rdpcap(scapy_path("test/pcaps/candump_gmlan_scanner.pcap.gz")) assert len(pkts) = Create GMLAN messages from packets builder = ISOTPMessageBuilder(basecls=GMLAN, use_ext_address=False, rx_id=[0x241, 0x641]) msgs = list() for p in pkts: if p.data == b"ECURESET": msgs.append(p) else: builder.feed(p) if len(builder): msgs.append(builder.pop()) assert len(msgs) = Create ECU-Clone from packets mEcu = Ecu(logging=False, verbose=False, store_supported_responses=True) for p in msgs: if isinstance(p, CAN) and p.data == b"ECURESET": mEcu.reset() else: mEcu.update(p) assert len(mEcu.supported_responses) = Test GMLAN_SAEnumerator evaluate_response e = GMLAN_SAEnumerator() config = {} s = EcuState(session=1) debug_dissector_backup = conf.debug_dissector # This tests involves corrupted Packets, therefore we need to disable the debug_dissector conf.debug_dissector = False assert False == e._evaluate_response(s, GMLAN(b"\x27\x01"), None, **config) config = {"exit_if_service_not_supported": True} assert not e._retry_pkt[s] assert True == e._evaluate_response(s, GMLAN(b"\x27\x01"), GMLAN(b"\x7f\x27\x11"), **config) assert not e._retry_pkt[s] assert True == e._evaluate_response(s, GMLAN(b"\x27\x01"), GMLAN(b"\x7f\x27\x22"), **config) assert e._retry_pkt[s] == GMLAN(b"\x27\x01") assert False == e._evaluate_response(s, GMLAN(b"\x27\x02"), GMLAN(b"\x7f\x27\x22"), **config) assert not e._retry_pkt[s] assert True == e._evaluate_response(s, GMLAN(b"\x27\x01"), GMLAN(b"\x7f\x27\x37"), **config) assert e._retry_pkt[s] == GMLAN(b"\x27\x01") assert False == e._evaluate_response(s, GMLAN(b"\x27\x01"), GMLAN(b"\x7f\x27\x37"), **config) assert not e._retry_pkt[s] assert True == e._evaluate_response(s, GMLAN(b"\x27\x01"), GMLAN(b"\x67\x01ab"), **config) assert not e._retry_pkt[s] assert False == e._evaluate_response(s, GMLAN(b"\x27\x01"), GMLAN(b"\x67\x02ab"), **config) assert not e._retry_pkt[s] conf.debug_dissector = debug_dissector_backup = Simulate ECU and run Scanner def securityAccess_Algorithm1(seed): return 0x5F51 keyfunction = securityAccess_Algorithm1 scanner = executeScannerInVirtualEnvironment( mEcu.supported_responses, [GMLAN_IDOEnumerator, GMLAN_PMEnumerator, GMLAN_RDEnumerator, GMLAN_SAEnumerator], GMLAN_SAEnumerator_kwargs={"keyfunction": keyfunction, "scan_range": range(2)}, GMLAN_PMEnumerator_kwargs={"unittest": True}) assert len(scanner.state_paths) == 9 assert scanner.scan_completed assert EcuState(session=1) in scanner.final_states assert EcuState(session=1, security_level=2) in scanner.final_states assert EcuState(session=3, tp=1) in scanner.final_states assert EcuState(session=2, tp=1, communication_control=1) in scanner.final_states assert EcuState(session=2, tp=1, communication_control=1, security_level=2) in scanner.final_states assert EcuState(session=3, tp=1, security_level=2) in scanner.final_states assert EcuState(session=2, tp=1, communication_control=1, security_level=2, request_download=1) in scanner.final_states = Simulate ECU and test GMLAN_RDBIEnumerator resps = [EcuResponse(None, [GMLAN()/GMLAN_RDBIPR(dataIdentifier=1)/Raw(b"asdfbeef1")]), EcuResponse(None, [GMLAN()/GMLAN_RDBIPR(dataIdentifier=2)/Raw(b"beef2")]), EcuResponse(None, [GMLAN()/GMLAN_RDBIPR(dataIdentifier=3)/Raw(b"beef3")]), EcuResponse(None, [GMLAN()/GMLAN_RDBIPR(dataIdentifier=0xff)/Raw(b"beefff")]), EcuResponse(None, [GMLAN()/GMLAN_NR(returnCode="SubFunctionNotSupported", requestServiceId="ReadDataByIdentifier")])] es = [GMLAN_RDBIEnumerator] scanner = executeScannerInVirtualEnvironment(resps, es) assert scanner.scan_completed tc = scanner.configuration.test_cases[0] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) == 256 - 4 assert len(tc.results_with_positive_response) == 4 assert len(tc.scanned_states) == 1 result = tc.show(dump=True) assert "asdfbeef1" in result assert "beef2" in result assert "beef3" in result assert "beefff" in result assert "SubFunctionNotSupported received" in result ids = [t.req.dataIdentifier for t in tc.results_with_positive_response] assert 1 in ids assert 2 in ids assert 3 in ids assert 0xff in ids = Simulate ECU and test GMLAN_WDBIEnumerator def wdbi_handler(resp, req): if req.service != 0x3b: return False assert req.dataIdentifier in [1, 2, 3, 0xff] resp.dataIdentifier = req.dataIdentifier if req.dataIdentifier == 1: assert req.dataRecord == b'asdfbeef1' return True if req.dataIdentifier == 2: assert req.dataRecord == b'beef2' return True if req.dataIdentifier == 3: assert req.dataRecord == b"beef3" return True if req.dataIdentifier == 0xff: assert req.dataRecord == b"beefff" return True return False resps = [EcuResponse(None, [GMLAN()/GMLAN_RDBIPR(dataIdentifier=1)/Raw(b"asdfbeef1")]), EcuResponse(None, [GMLAN()/GMLAN_RDBIPR(dataIdentifier=2)/Raw(b"beef2")]), EcuResponse(None, [GMLAN()/GMLAN_RDBIPR(dataIdentifier=3)/Raw(b"beef3")]), EcuResponse(None, [GMLAN()/GMLAN_RDBIPR(dataIdentifier=0xff)/Raw(b"beefff")]), EcuResponse(None, [GMLAN()/GMLAN_WDBIPR()], answers=wdbi_handler), EcuResponse(None, [GMLAN()/GMLAN_NR(returnCode="SubFunctionNotSupported", requestServiceId="ReadDataByIdentifier")])] es = [GMLAN_WDBISelectiveEnumerator] scanner = executeScannerInVirtualEnvironment(resps, es) assert scanner.scan_completed tc = scanner.configuration.test_cases[0][0] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) == 256 - 4 assert len(tc.results_with_positive_response) == 4 assert len(tc.scanned_states) == 1 result = tc.show(dump=True) assert "asdfbeef1" in result assert "beef2" in result assert "beef3" in result assert "beefff" in result assert "SubFunctionNotSupported received" in result ids = [t.req.dataIdentifier for t in tc.results_with_positive_response] assert 1 in ids assert 2 in ids assert 3 in ids assert 0xff in ids ######################### WDBI ############################# tc = scanner.configuration.test_cases[0][1] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) == 0 assert len(tc.results_with_positive_response) == 4 assert len(tc.scanned_states) == 1 result = tc.show(dump=True) ids = [t.req.dataIdentifier for t in tc.results_with_positive_response] assert 1 in ids assert 2 in ids assert 3 in ids assert 0xff in ids = Simulate ECU and test GMLAN_RDBPIEnumerator resps = [EcuResponse(None, [GMLAN()/GMLAN_RDBPIPR(parameterIdentifier=1)/Raw(b"asdfbeef1")]), EcuResponse(None, [GMLAN()/GMLAN_RDBPIPR(parameterIdentifier=2)/Raw(b"asdfbeef2")]), EcuResponse(None, [GMLAN()/GMLAN_RDBPIPR(parameterIdentifier=3)/Raw(b"beef3")]), EcuResponse(None, [GMLAN()/GMLAN_RDBPIPR(parameterIdentifier=0xffff)/Raw(b"beefffff")]), EcuResponse(None, [GMLAN()/GMLAN_NR(returnCode="SubFunctionNotSupported", requestServiceId="ReadDataByParameterIdentifier")])] es = [GMLAN_RDBPIEnumerator] scanner = executeScannerInVirtualEnvironment(resps, es, GMLAN_RDBPIEnumerator_kwargs={"scan_range":list(range(0x100)) + list(range(0xff00, 0x10000))}) assert scanner.scan_completed tc = scanner.configuration.test_cases[0] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) == 0x200 - 4 assert len(tc.results_with_positive_response) == 4 assert len(tc.scanned_states) == 1 result = tc.show(dump=True) assert "asdfbeef1" in result assert "asdfbeef2" in result assert "beef3" in result assert "beefffff" in result assert "SubFunctionNotSupported received" in result ids = [t.req.identifiers[0] for t in tc.results_with_positive_response] assert 1 in ids assert 2 in ids assert 3 in ids assert 0xffff in ids = Simulate ECU and test GMLAN_TPEnumerator resps = [EcuResponse(None, [GMLAN(service=0x7e)])] es = [GMLAN_TPEnumerator] scanner = executeScannerInVirtualEnvironment(resps, es) assert scanner.scan_completed tc = scanner.configuration.test_cases[0] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) == 0 assert len(tc.results_with_positive_response) == 2 assert len(tc.scanned_states) == 2 = Simulate ECU and test GMLAN_DCEnumerator resps = [EcuResponse(None, [GMLAN()/GMLAN_DCPR(CPIDNumber=1)]), EcuResponse(None, [GMLAN()/GMLAN_DCPR(CPIDNumber=2)]), EcuResponse(None, [GMLAN()/GMLAN_DCPR(CPIDNumber=3)/Raw(b"beef3")]), EcuResponse(None, [GMLAN()/GMLAN_DCPR(CPIDNumber=0xff)/Raw(b"beefff")]), EcuResponse(None, [GMLAN()/GMLAN_NR(returnCode="SubFunctionNotSupported", requestServiceId="DeviceControl")])] es = [GMLAN_DCEnumerator] scanner = executeScannerInVirtualEnvironment(resps, es) assert scanner.scan_completed tc = scanner.configuration.test_cases[0] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) == 256 - 4 assert len(tc.results_with_positive_response) == 4 assert len(tc.scanned_states) == 1 ids = [t.req.CPIDNumber for t in tc.results_with_positive_response] assert 1 in ids assert 2 in ids assert 3 in ids assert 255 in ids result = tc.show(dump=True) assert "SubFunctionNotSupported received " in result = Simulate ECU and test GMLAN_TDEnumerator conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 positive_responses_left = 4 def answers_td(resp, req): global positive_responses_left if req.service != 0x36: return False if not positive_responses_left: return False positive_responses_left -= 1 resp.service = 0x76 return True resps = [EcuResponse(None, [GMLAN(service="TransferDataPositiveResponse")], answers=answers_td), EcuResponse(None, [GMLAN()/GMLAN_NR(returnCode="RequestOutOfRange", requestServiceId="TransferData")])] es = [GMLAN_TDEnumerator] scanner = executeScannerInVirtualEnvironment(resps, es) assert scanner.scan_completed tc = scanner.configuration.test_cases[0] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) == 0x1ff - 4 assert len(tc.results_with_positive_response) == 4 assert len(tc.scanned_states) == 1 result = tc.show(dump=True) assert "RequestOutOfRange received " in result = Simulate ECU and test GMLAN_RMBAEnumerator 1 ~ not_pypy conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 2 memory = dict() mem_areas = [(0x100, 0x1f00), (0xd000, 0xff00), (0xa000, 0xc000), (0x3000, 0x5f00)] mem_ranges = [range(s, e) for s, e in mem_areas] mem_inner_borders = [s for s, _ in mem_areas] mem_inner_borders += [e - 1 for _, e in mem_areas] mem_outer_borders = [s - 1 for s, _ in mem_areas] mem_outer_borders += [e for _, e in mem_areas] mem_random_test_points = [] for _ in range(100): mem_random_test_points += [random.choice(list(itertools.chain(*mem_ranges)))] for addr in itertools.chain(*mem_ranges): memory[addr] = addr & 0xff def answers_rmba(resp, req): global memory if req.service != 0x23: return False if req.memoryAddress not in memory.keys(): return False out_mem = list() for i in range(req.memoryAddress, req.memoryAddress + req.memorySize): try: out_mem.append(memory[i]) except KeyError: pass resp.memoryAddress = req.memoryAddress resp.dataRecord = bytes(out_mem) return True resps = [EcuResponse(None, [GMLAN()/GMLAN_RMBAPR(memoryAddress=0, dataRecord=b'')], answers=answers_rmba), EcuResponse(None, [GMLAN()/GMLAN_NR(returnCode="RequestOutOfRange", requestServiceId="ReadMemoryByAddress")])] ####################################################### scanner = executeScannerInVirtualEnvironment(resps, [GMLAN_RMBAEnumerator]) assert scanner.scan_completed tc1 = scanner.configuration.test_cases[0] assert len(tc1.results_without_response) < 10 assert len(tc1.results_with_negative_response) > 10 assert len(tc1.results_with_positive_response) > 50 assert len(tc1.scanned_states) == 1 result = tc1.show(dump=True) assert "RequestOutOfRange received " in result def _get_memory_addresses_from_results(results): mem_areas = [ range(tup.req.memoryAddress, tup.req.memoryAddress + tup.req.memorySize) for tup in results] return set(list(itertools.chain.from_iterable(mem_areas))) ############################################################ addrs = _get_memory_addresses_from_results(tc1.results_with_positive_response) print([tp in addrs for tp in mem_inner_borders].count(True) / len(mem_inner_borders)) assert [tp in addrs for tp in mem_inner_borders].count(True) / len(mem_inner_borders) > 0.8 print([tp in addrs for tp in mem_random_test_points].count(True) / len(mem_random_test_points)) assert [tp in addrs for tp in mem_random_test_points].count(True) / len(mem_random_test_points) > 0.8 print([tp not in addrs for tp in mem_outer_borders].count(True) / len(mem_outer_borders)) assert [tp not in addrs for tp in mem_outer_borders].count(True) / len(mem_outer_borders) > 0.8 = Simulate ECU and test GMLAN_RMBAEnumerator 2 * This test takes very long to execute ~ disabled conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 3 memory = dict() for addr in itertools.chain(range(0x10000), range(0xf00000, 0xf0f000)): memory[addr] = addr & 0xff resps = [EcuResponse(None, [GMLAN()/GMLAN_RMBAPR(memoryAddress=0, dataRecord=b'')], answers=answers_rmba), EcuResponse(None, [GMLAN()/GMLAN_NR(returnCode="RequestOutOfRange", requestServiceId="ReadMemoryByAddress")])] scanner = executeScannerInVirtualEnvironment(resps, [GMLAN_RMBAEnumerator]) assert scanner.scan_completed tc = scanner.configuration.test_cases[0] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) > 350 assert len(tc.results_with_positive_response) > 50 assert len(tc.scanned_states) == 1 addrs = [t.req.memoryAddress for t in tc.results_with_positive_response] assert 0 in addrs assert 0x10 in addrs assert 0xf0 in addrs assert 0x3000 in addrs assert 0x3090 in addrs assert 0xa100 in addrs assert 0xa1f0 in addrs assert 0xa200 in addrs assert 0xa2f0 in addrs assert 0xf000 in addrs assert 0xf0f0 in addrs result = tc.show(dump=True) assert "RequestOutOfRange received " in result + Cleanup = Delete TestSockets cleanup_testsockets()