% Regression tests for Simulated ECUs and UDS Scanners ~ scanner + Configuration ~ conf = Imports import io import pickle from scapy.contrib.isotp import ISOTPMessageBuilder from test.testsocket import TestSocket, cleanup_testsockets, UnstableSocket from scapy.automaton import ObjectPipe ############ ############ + Load general modules = Load contribution layer from scapy.contrib.automotive.uds import * from scapy.contrib.automotive.uds_ecu_states import * from scapy.contrib.automotive.uds_scan import * from scapy.contrib.automotive.ecu import * load_layer("can") conf.debug_dissector = False = Define Testfunction def executeScannerInVirtualEnvironment(supported_responses, enumerators, unstable_socket=True, **kwargs): tester_obj_pipe = ObjectPipe(name="TesterPipe") ecu_obj_pipe = ObjectPipe(name="ECUPipe") TesterSocket = UnstableSocket if unstable_socket else TestSocket tester = TesterSocket(UDS, tester_obj_pipe) ecu = TestSocket(UDS, ecu_obj_pipe) tester.pair(ecu) answering_machine = EcuAnsweringMachine( supported_responses=supported_responses, main_socket=ecu, basecls=UDS, verbose=False) def reset(): answering_machine.state.reset() answering_machine.state["session"] = 1 sniff(timeout=0.001, opened_socket=[ecu, tester]) def reconnect(): try: tester.close() except Exception: pass tester = TesterSocket(UDS, tester_obj_pipe) ecu.pair(tester) return tester def answering_machine_thread(): answering_machine( timeout=120, stop_filter=lambda x: bytes(x) == b"\xff\xff\xff") sim = threading.Thread(target=answering_machine_thread) try: sim.start() scanner = UDS_Scanner( tester, reset_handler=reset, reconnect_handler=reconnect, test_cases=enumerators, timeout=0.1, retry_if_none_received=True, unittest=True, **kwargs) for i in range(12): print("Starting scan") scanner.scan(timeout=10) if scanner.scan_completed: print("Scan completed after %d iterations" % i) break finally: ecu.ins.send(Raw(b"\xff\xff\xff")) sim.join(timeout=2) assert not sim.is_alive() cleanup_testsockets() tester_obj_pipe.close() ecu_obj_pipe.close() if LINUX: pickle_test(scanner) return scanner def pickle_test(scanner): f = io.BytesIO() pickle.dump(scanner, f) unp = pickle.loads(f.getvalue()) assert scanner.scan_completed == unp.scan_completed assert scanner.state_paths == unp.state_paths = Load packets from pcap conf.contribs['CAN']['swap-bytes'] = True pkts = rdpcap(scapy_path("test/pcaps/candump_uds_scanner.pcap.gz")) assert len(pkts) = Create UDS messages from packets builder = ISOTPMessageBuilder(basecls=UDS, use_ext_address=False, rx_id=[0x641, 0x651]) msgs = list() for p in pkts: if p.data == b"ECURESET": msgs.append(p) else: builder.feed(p) if len(builder): msgs.append(builder.pop()) assert len(msgs) = Create ECU-Clone from packets mEcu = Ecu(logging=False, verbose=False, store_supported_responses=True, lookahead=3) for p in msgs: if isinstance(p, CAN) and p.data == b"ECURESET": mEcu.reset() else: mEcu.update(p) assert len(mEcu.supported_responses) = Test UDS_SAEnumerator evaluate_response e = UDS_SAEnumerator() config = {} s = EcuState(session=1) assert False == e._evaluate_response(s, UDS(b"\x27\x01"), None, **config) config = {"exit_if_service_not_supported": True} assert not e._retry_pkt[s] assert True == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x7f\x27\x11"), **config) assert not e._retry_pkt[s] assert True == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x7f\x27\x24"), **config) assert e._retry_pkt[s] == UDS(b"\x27\x01") assert False == e._evaluate_response(s, UDS(b"\x27\x02"), UDS(b"\x7f\x27\x24"), **config) assert not e._retry_pkt[s] assert True == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x7f\x27\x37"), **config) assert e._retry_pkt[s] == UDS(b"\x27\x01") assert False == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x7f\x27\x37"), **config) assert not e._retry_pkt[s] assert True == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x67\x01ab"), **config) assert not e._retry_pkt[s] assert False == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x67\x02ab"), **config) assert not e._retry_pkt[s] = Test UDS_SA_XOR_Enumerator stand alone mode TesterSocket = TestSocket ecu_sock = TestSocket(UDS) mTester = TesterSocket(UDS) ecu_sock.pair(mTester) answering_machine = EcuAnsweringMachine(supported_responses=mEcu.supported_responses, main_socket=ecu_sock, basecls=UDS, verbose=False) sim = threading.Thread(target=answering_machine, kwargs={'timeout': 1000, "stop_filter": lambda x: bytes(x) == b"\xff\xff\xff"}) sim.start() try: resp = mTester.sr1(UDS()/UDS_TP(b"\x00"), verbose=False, timeout=1) print(repr(resp)) assert resp and resp.service != 0x7f resp = mTester.sr1(UDS()/UDS_DSC(diagnosticSessionType=3), verbose=False, timeout=1) print(repr(resp)) assert resp and resp.service != 0x7f assert UDS_SA_XOR_Enumerator().get_security_access(mTester, 1) finally: mTester.send(Raw(b"\xff\xff\xff")) sim.join(timeout=2) cleanup_testsockets() = Test configuration validation try: scanner = UDS_Scanner(TestSocket(UDS), test_cases=[UDS_SA_XOR_Enumerator, UDS_DSCEnumerator, UDS_ServiceEnumerator], UDS_DSCEnumerator_kwargs={"scan_range": range(0x1000), "delay_state_change": 0, "overwrite_timeout": False}) assert False except Scapy_Exception: pass = Simulate ECU and run Scanner scanner = executeScannerInVirtualEnvironment( mEcu.supported_responses, [UDS_SA_XOR_Enumerator, UDS_DSCEnumerator, UDS_ServiceEnumerator], UDS_DSCEnumerator_kwargs={"scan_range": range(5), "delay_state_change": 0, "overwrite_timeout": False}, UDS_SA_XOR_Enumerator_kwargs={"scan_range": range(5)}, UDS_ServiceEnumerator_kwargs={"scan_range": [0x10, 0x11, 0x14, 0x19, 0x22, 0x23, 0x24, 0x27, 0x28, 0x29, 0x2A, 0x2C, 0x2E, 0x2F, 0x31, 0x34, 0x35, 0x36, 0x37, 0x38, 0x3D, 0x3E, 0x83, 0x84, 0x85, 0x87], "request_length": 1}) scanner.show_testcases() scanner.show_testcases_status() assert len(scanner.state_paths) == 5 assert scanner.scan_completed assert scanner.progress() > 0.95 assert EcuState(session=1) in scanner.final_states assert EcuState(session=2, tp=1) in scanner.final_states assert EcuState(session=3, tp=1) in scanner.final_states assert EcuState(session=2, tp=1, security_level=2) in scanner.final_states assert EcuState(session=3, tp=1, security_level=2) in scanner.final_states #################### UDS_SA_XOR_Enumerator ################ tc = scanner.configuration.test_cases[0] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) == 19 assert len(tc.results_with_positive_response) >= 6 assert len(tc.scanned_states) == 5 result = tc.show(dump=True) assert "serviceNotSupportedInActiveSession received 5 times" in result assert "incorrectMessageLengthOrInvalidFormat received 14 times" in result ################# UDS_DSCEnumerator ##################### tc = scanner.configuration.test_cases[1] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) == 20 assert len(tc.results_with_positive_response) == 5 assert len(tc.scanned_states) == 5 result = tc.show(dump=True) assert "incorrectMessageLengthOrInvalidFormat received 20 times" in result ###################### UDS_ServiceEnumerator ################### tc = scanner.configuration.test_cases[2] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) == 130 assert len(tc.results_with_positive_response) == 0 assert len(tc.scanned_states) == 5 result = tc.show(dump=True) assert "incorrectMessageLengthOrInvalidFormat received 34 times" in result assert "serviceNotSupported received 75 times" in result assert "serviceNotSupportedInActiveSession received 19 times" in result assert "securityAccessDenied received 2 times" in result = UDS_ServiceEnumerator def req_handler(resp, req): if req.service != 0x22: return False if len(req) == 1: resp.negativeResponseCode="generalReject" return True if len(req) == 2: resp.negativeResponseCode="incorrectMessageLengthOrInvalidFormat" return True if len(req) == 3: resp.negativeResponseCode="requestOutOfRange" return True return False resps = [EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")], req_handler)] es = [UDS_ServiceEnumerator] debug_dissector_backup = conf.debug_dissector # This Enumerator is sending corrupted Packets, therefore we need to disable the debug_dissector conf.debug_dissector = False scanner = executeScannerInVirtualEnvironment( resps, es, UDS_ServiceEnumerator_kwargs={"request_length": 3}, unstable_socket=False) conf.debug_dissector = debug_dissector_backup assert scanner.scan_completed assert scanner.progress() > 0.95 tc = scanner.configuration.test_cases[0] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() tc.show() assert len(tc.results_with_negative_response) == 128 * 3 assert len(tc.results_with_positive_response) == 0 assert len(tc.scanned_states) == 1 result = tc.show(dump=True) assert "incorrectMessageLengthOrInvalidFormat" in result assert "requestOutOfRange" in result = UDS_RDBIEnumerator resps = [EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=1)/Raw(b"asdfbeef1")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=2)/Raw(b"beef2")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=3)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0xff)/Raw(b"beefff")]), EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")])] es = [UDS_RDBIEnumerator] scanner = executeScannerInVirtualEnvironment( resps, es, UDS_RDBIEnumerator_kwargs={"scan_range": range(0x100)}) assert scanner.scan_completed assert scanner.progress() > 0.95 tc = scanner.configuration.test_cases[0] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) == 256 - 4 assert len(tc.results_with_positive_response) == 4 assert len(tc.scanned_states) == 1 result = tc.show(dump=True) assert "asdfbeef1" in result assert "beef2" in result assert "beef3" in result assert "beefff" in result assert "subFunctionNotSupported received" in result ids = [t.req.identifiers[0] for t in tc.results_with_positive_response] assert 1 in ids assert 2 in ids assert 3 in ids assert 0xff in ids = UDS_RDBISelectiveEnumerator ~ not_pypy resps = [EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x101)/Raw(b"asdfbeef1")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x102)/Raw(b"beef2")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x103)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x104)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x105)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x106)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x107)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x108)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x109)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x110)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x111)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x112)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x113)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x114)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x115)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x116)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x117)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x118)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x119)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x120)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x121)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x122)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x123)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x124)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x125)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x126)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x127)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x128)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x129)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x130)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x131)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x132)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x133)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x134)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x135)/Raw(b"beef35")]), EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")])] es = [UDS_RDBISelectiveEnumerator] scanner = executeScannerInVirtualEnvironment(resps, es, UDS_RDBIRandomEnumerator_kwargs={"probe_start": 0, "probe_end": 0x500}) assert scanner.scan_completed assert scanner.progress() > 0.95 tc = scanner.configuration.stages[0][0] tc.show() assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) > 100 assert len(tc.results_with_positive_response) >= 1 assert len(tc.scanned_states) == 1 assert scanner.scan_completed assert scanner.progress() > 0.95 tc = scanner.configuration.stages[0][1] tc.show() assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) == 29 assert len(tc.results_with_positive_response) == 35 assert len(tc.scanned_states) == 1 result = tc.show(dump=True) assert "asdfbeef1" in result assert "beef2" in result assert "beef3" in result assert "beef35" in result assert "subFunctionNotSupported received" in result ids = [t.req.identifiers[0] for t in tc.results_with_positive_response] assert 0x101 in ids assert 0x102 in ids assert 0x103 in ids assert 0x135 in ids = UDS_WDBIEnumerator def wdbi_handler(resp, req): if req.service != 0x2E: return False assert req.dataIdentifier in [1, 2, 3, 0xff] resp.dataIdentifier = req.dataIdentifier if req.dataIdentifier == 1: assert req.load == b'asdfbeef1' return True if req.dataIdentifier == 2: assert req.load == b'beef2' return True if req.dataIdentifier == 3: assert req.load == b"beef3" return True if req.dataIdentifier == 0xff: assert req.load == b"beefff" return True return False resps = [EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=1)/Raw(b"asdfbeef1")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=2)/Raw(b"beef2")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=3)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0xff)/Raw(b"beefff")]), EcuResponse(None, [UDS()/UDS_WDBIPR()], answers=wdbi_handler), EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")])] es = [UDS_WDBISelectiveEnumerator()] scanner = executeScannerInVirtualEnvironment( resps, es, UDS_RDBIEnumerator_kwargs={"scan_range": range(0x100)}) assert scanner.scan_completed assert scanner.progress() > 0.95 tc = scanner.configuration.stages[0][0] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) == 256 - 4 assert len(tc.results_with_positive_response) == 4 assert len(tc.scanned_states) == 1 result = tc.show(dump=True) assert "asdfbeef1" in result assert "beef2" in result assert "beef3" in result assert "beefff" in result assert "subFunctionNotSupported received" in result ids = [t.req.identifiers[0] for t in tc.results_with_positive_response] assert 1 in ids assert 2 in ids assert 3 in ids assert 0xff in ids ######################### WDBI ############################# tc = scanner.configuration.stages[0][1] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) == 0 assert len(tc.results_with_positive_response) == 4 assert len(tc.scanned_states) == 1 result = tc.show(dump=True) ids = [t.req.dataIdentifier for t in tc.results_with_positive_response] assert 1 in ids assert 2 in ids assert 3 in ids assert 0xff in ids = UDS_WDBIEnumerator 2 def wdbi_handler(resp, req): if req.service != 0x2E: return False assert req.dataIdentifier in [1, 2, 3, 0xff] resp.dataIdentifier = req.dataIdentifier if req.dataIdentifier == 1: assert req.load == b'asdfbeef1' return True if req.dataIdentifier == 2: assert req.load == b'beef2' return True if req.dataIdentifier == 3: assert req.load == b"beef3" return True if req.dataIdentifier == 0xff: assert req.load == b"beefff" return True return False resps = [EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=1)/Raw(b"asdfbeef1")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=2)/Raw(b"beef2")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=3)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0xff)/Raw(b"beefff")]), EcuResponse(None, [UDS()/UDS_WDBIPR()], answers=wdbi_handler), EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")])] es = [UDS_WDBISelectiveEnumerator] scanner = executeScannerInVirtualEnvironment( resps, es, UDS_RDBIEnumerator_kwargs={"scan_range": range(0x100)}) assert scanner.scan_completed assert scanner.progress() > 0.95 tc = scanner.configuration.test_cases[0][0] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) == 256 - 4 assert len(tc.results_with_positive_response) == 4 assert len(tc.scanned_states) == 1 result = tc.show(dump=True) assert "asdfbeef1" in result assert "beef2" in result assert "beef3" in result assert "beefff" in result assert "subFunctionNotSupported received" in result ids = [t.req.identifiers[0] for t in tc.results_with_positive_response] assert 1 in ids assert 2 in ids assert 3 in ids assert 0xff in ids ######################### WDBI ############################# tc = scanner.configuration.test_cases[0][1] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) == 0 assert len(tc.results_with_positive_response) == 4 assert len(tc.scanned_states) == 1 result = tc.show(dump=True) ids = [t.req.dataIdentifier for t in tc.results_with_positive_response] assert 1 in ids assert 2 in ids assert 3 in ids assert 0xff in ids = UDS_TPEnumerator resps = [EcuResponse(None, [UDS()/UDS_TPPR()]), EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="serviceNotSupported", requestServiceId="TesterPresent")])] es = [UDS_TPEnumerator] scanner = executeScannerInVirtualEnvironment(resps, es) assert scanner.scan_completed assert scanner.progress() > 0.95 tc = scanner.configuration.test_cases[0] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) == 0 assert len(tc.results_with_positive_response) == 2 assert len(tc.scanned_states) == 2 assert tc.show(dump=True) = UDS_EREnumerator resps = [EcuResponse(None, [UDS()/UDS_ERPR(resetType=1)]), EcuResponse(None, [UDS()/UDS_ERPR(resetType=3)]), EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ECUReset")])] es = [UDS_EREnumerator] scanner = executeScannerInVirtualEnvironment(resps, es) assert scanner.scan_completed assert scanner.progress() > 0.95 tc = scanner.configuration.test_cases[0] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) == 256 - 2 assert len(tc.results_with_positive_response) == 2 assert len(tc.scanned_states) == 1 result = tc.show(dump=True) assert "hardReset" in result assert "softReset" in result assert "subFunctionNotSupported received" in result ids = [t.req.resetType for t in tc.results_with_positive_response] assert 1 in ids assert 3 in ids = UDS_CCEnumerator resps = [EcuResponse(None, [UDS()/UDS_CCPR(controlType=1)]), EcuResponse(None, [UDS()/UDS_CCPR(controlType=3)]), EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="CommunicationControl")])] es = [UDS_CCEnumerator] scanner = executeScannerInVirtualEnvironment(resps, es, inter=0.001) assert scanner.scan_completed assert scanner.progress() > 0.95 tc = scanner.configuration.test_cases[0] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) == 256 - 2 assert len(tc.results_with_positive_response) == 2 assert len(tc.scanned_states) == 1 result = tc.show(dump=True) assert "enableRxAndDisableTx" in result assert "disableRxAndTx" in result assert "subFunctionNotSupported received" in result ids = [t.req.controlType for t in tc.results_with_positive_response] assert 1 in ids assert 3 in ids = UDS_RDBPIEnumerator UDS_RDBPI.periodicDataIdentifiers[1] = "identifierElectric" UDS_RDBPI.periodicDataIdentifiers[3] = "identifierGas" resps = [EcuResponse(None, [UDS()/UDS_RDBPIPR(periodicDataIdentifier=1, dataRecord=b'electric')]), EcuResponse(None, [UDS()/UDS_RDBPIPR(periodicDataIdentifier=3, dataRecord=b'gas')]), EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataPeriodicIdentifier")])] es = [UDS_RDBPIEnumerator] scanner = executeScannerInVirtualEnvironment(resps, es) assert scanner.scan_completed assert scanner.progress() > 0.95 tc = scanner.configuration.test_cases[0] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) == 256 - 2 assert len(tc.results_with_positive_response) == 2 assert len(tc.scanned_states) == 1 result = tc.show(dump=True) assert "electric" in result assert "gas" in result assert "0x01 identifierElectric" in result assert "0x03 identifierGas" in result assert "subFunctionNotSupported received" in result ids = [t.req.periodicDataIdentifier for t in tc.results_with_positive_response] assert 1 in ids assert 3 in ids = UDS_RCEnumerator resps = [EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=1, routineIdentifier=1)/Raw(b"asdfbeef1")]), EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=2, routineIdentifier=2)/Raw(b"beef2")]), EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=3, routineIdentifier=3)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=1, routineIdentifier=0x10)/Raw(b"beefff")]), EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="RoutineControl")])] es = [UDS_RCEnumerator] scanner = executeScannerInVirtualEnvironment(resps, es, UDS_RCEnumerator_kwargs={"scan_range": range(0x11)}) assert scanner.scan_completed assert scanner.progress() > 0.95 tc = scanner.configuration.test_cases[0] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) == 0x11 * 3 - 4 assert len(tc.results_with_positive_response) == 4 assert len(tc.scanned_states) == 1 result = tc.show(dump=True) assert "subFunctionNotSupported received" in result ids = [t.req.routineIdentifier for t in tc.results_with_positive_response] assert 1 in ids assert 2 in ids assert 3 in ids assert 0x10 in ids = UDS_RCSelectiveEnumerator resps = [EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=1, routineIdentifier=1)/Raw(b"asdfbeef1")]), EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=2, routineIdentifier=1)/Raw(b"beef2")]), EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=3, routineIdentifier=1)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=1, routineIdentifier=0x10)/Raw(b"beefff")]), EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="RoutineControl")])] es = [UDS_RCSelectiveEnumerator] scanner = executeScannerInVirtualEnvironment(resps, es, UDS_RCStartEnumerator_kwargs={"scan_range": range(0x11)}) assert scanner.scan_completed assert scanner.progress() > 0.95 tc = scanner.configuration.stages[0][0] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) == 0x11 - 2 assert len(tc.results_with_positive_response) == 2 assert len(tc.scanned_states) == 1 result = tc.show(dump=True) assert "subFunctionNotSupported received" in result ids = [t.req.routineIdentifier for t in tc.results_with_positive_response] assert 1 in ids assert 0x10 in ids tc = scanner.configuration.stages[0][1] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) == 538 assert len(tc.results_with_positive_response) == 2 assert len(tc.scanned_states) == 1 result = tc.show(dump=True) assert "subFunctionNotSupported received" in result ids = [t.req.routineIdentifier for t in tc.results_with_positive_response] assert 1 in ids = UDS_IOCBIEnumerator resps = [EcuResponse(None, [UDS()/UDS_IOCBIPR(dataIdentifier=1)/Raw(b"asdfbeef1")]), EcuResponse(None, [UDS()/UDS_IOCBIPR(dataIdentifier=2)/Raw(b"beef2")]), EcuResponse(None, [UDS()/UDS_IOCBIPR(dataIdentifier=3)/Raw(b"beef3")]), EcuResponse(None, [UDS()/UDS_IOCBIPR(dataIdentifier=0xff)/Raw(b"beefff")]), EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="InputOutputControlByIdentifier")])] es = [UDS_IOCBIEnumerator] scanner = executeScannerInVirtualEnvironment(resps, es, UDS_IOCBIEnumerator_kwargs={"scan_range": range(0x100)}) assert scanner.scan_completed assert scanner.progress() > 0.95 tc = scanner.configuration.test_cases[0] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) == 0x100 - 4 assert len(tc.results_with_positive_response) == 4 assert len(tc.scanned_states) == 1 result = tc.show(dump=True) assert "asdfbeef1" in result assert "beef2" in result assert "beef3" in result assert "beefff" in result assert "subFunctionNotSupported received" in result ids = [t.req.dataIdentifier for t in tc.results_with_positive_response] assert 1 in ids assert 2 in ids assert 3 in ids assert 0xff in ids = UDS_RDEnumerator memory = dict() for addr in itertools.chain(range(0x1f00), range(0xd000, 0xfff2), range(0xa000, 0xcf00), range(0x2000, 0x5f00)): memory[addr] = addr & 0xff def answers_rd(resp, req): global memory if req.service != 0x34: return False if req.memorySizeLen in [1, 3, 4]: return False if req.memoryAddressLen in [1, 3, 4]: return False addr = getattr(req, "memoryAddress%d" % req.memoryAddressLen) if addr not in memory.keys(): return False resp.memorySizeLen = req.memorySizeLen return True resps = [EcuResponse(None, [UDS()/UDS_RDPR()], answers=answers_rd), EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="requestOutOfRange", requestServiceId="RequestDownload")])] ####################################################### scanner = executeScannerInVirtualEnvironment( resps, [UDS_RDEnumerator], unstable_socket=False, UDS_RDEnumerator_kwargs={"unittest": True}) assert scanner.scan_completed assert scanner.progress() > 0.95 tc1 = scanner.configuration.test_cases[0] assert len(tc1.results_without_response) < 10 if len(tc1.results_without_response): tc1.show() assert len(tc1.results_with_negative_response) > 400 assert len(tc1.results_with_positive_response) > 40 assert len(tc1.scanned_states) == 1 result = tc1.show(dump=True) assert "requestOutOfRange received " in result = UDS_RMBARandomEnumerator pkt = UDS_RMBARandomEnumerator._random_memory_addr_pkt(4, 4, 10) assert pkt.memorySizeLen == 4 assert pkt.memoryAddressLen == 4 assert pkt.memorySize4 == 10 assert pkt.memoryAddress4 is not None pkt = UDS_RMBARandomEnumerator._random_memory_addr_pkt() assert pkt.memorySizeLen in [1, 2, 3, 4] assert pkt.memoryAddressLen in [1, 2, 3, 4] pkt2 = UDS_RMBARandomEnumerator._random_memory_addr_pkt() assert pkt != pkt2 = UDS_RMBAEnumerator ~ linux not_pypy memory = dict() mem_areas = [(0x100, 0x1f00), (0xd000, 0xff00), (0xa000, 0xc000), (0x3000, 0x5f00)] mem_ranges = [range(s, e) for s, e in mem_areas] mem_inner_borders = [s for s, _ in mem_areas] mem_inner_borders += [e - 1 for _, e in mem_areas] mem_outer_borders = [s - 1 for s, _ in mem_areas] mem_outer_borders += [e for _, e in mem_areas] mem_random_test_points = [] for _ in range(100): mem_random_test_points += [random.choice(list(itertools.chain(*mem_ranges)))] for addr in itertools.chain(*mem_ranges): memory[addr] = addr & 0xff def answers_rmba(resp, req): global memory if req.service != 0x23: return False if req.memorySizeLen in [1, 3, 4]: return False if req.memoryAddressLen in [1, 3, 4]: return False addr = getattr(req, "memoryAddress%d" % req.memoryAddressLen) if addr not in memory.keys(): return False out_mem = list() size = getattr(req, "memorySize%d" % req.memorySizeLen) for i in range(addr, addr + size): try: out_mem.append(memory[i]) except KeyError: pass resp.dataRecord = bytes(out_mem) return True resps = [EcuResponse(None, [UDS()/UDS_RMBAPR(dataRecord=b'')], answers=answers_rmba), EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="requestOutOfRange", requestServiceId="ReadMemoryByAddress")])] ####################################################### scanner = executeScannerInVirtualEnvironment( resps, [UDS_RMBAEnumerator], unstable_socket=False, UDS_RMBARandomEnumerator_kwargs={"unittest": True}) assert scanner.scan_completed tc1 = scanner.configuration.stages[0][1] assert len(tc1.results_without_response) < 30 if len(tc1.results_without_response): tc1.show() assert len(tc1.results_with_negative_response) > 10 assert len(tc1.results_with_positive_response) > 300 assert len(tc1.scanned_states) == 1 result = tc1.show(dump=True) assert "requestOutOfRange received " in result ############################################################ addrs = tc1._get_memory_addresses_from_results(tc1.results_with_positive_response) print(float([tp in addrs for tp in mem_inner_borders].count(True)) / len(mem_inner_borders)) assert float([tp in addrs for tp in mem_inner_borders].count(True)) / len(mem_inner_borders) > 0.8 print(float([tp in addrs for tp in mem_random_test_points].count(True)) / len(mem_random_test_points)) assert float([tp in addrs for tp in mem_random_test_points].count(True)) / len(mem_random_test_points) > 0.8 print(float([tp not in addrs for tp in mem_outer_borders].count(True)) / len(mem_outer_borders)) assert float([tp not in addrs for tp in mem_outer_borders].count(True)) / len(mem_outer_borders) > 0.7 = UDS_TDEnumerator resps = [EcuResponse(None, [UDS()/UDS_TDPR(blockSequenceCounter=1)]), EcuResponse(None, [UDS()/UDS_TDPR(blockSequenceCounter=3)]), EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="TransferData")])] es = [UDS_TDEnumerator] scanner = executeScannerInVirtualEnvironment(resps, es) assert scanner.scan_completed assert scanner.progress() > 0.95 tc = scanner.configuration.test_cases[0] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) == 256 - 2 assert len(tc.results_with_positive_response) == 2 assert len(tc.scanned_states) == 1 result = tc.show(dump=True) assert "subFunctionNotSupported received" in result ids = [t.req.blockSequenceCounter for t in tc.results_with_positive_response] assert 1 in ids assert 3 in ids = BMW_DevJobEnumerator load_contrib("automotive.bmw.definitions") load_contrib("automotive.bmw.enumerator") resps = [EcuResponse(None, [UDS()/DEV_JOB_PR(identifier=0xff00)/Raw(b"asdfbeef1")]), EcuResponse(None, [UDS()/DEV_JOB_PR(identifier=0xff02)/Raw(b"beef2")]), EcuResponse(None, [UDS()/DEV_JOB_PR(identifier=0xff03)/Raw(b"beef3")]), EcuResponse(None, [UDS()/DEV_JOB_PR(identifier=0xffff)/Raw(b"beefff")]), EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="DevelopmentJob")])] es = [BMW_DevJobEnumerator] scanner = executeScannerInVirtualEnvironment(resps, es, BMW_DevJobEnumerator_kwargs={"scan_range": range(0xFF00, 0x10000)}) assert scanner.scan_completed assert scanner.progress() > 0.95 tc = scanner.configuration.test_cases[0] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() assert len(tc.results_with_negative_response) == 0x100 - 4 assert len(tc.results_with_positive_response) == 4 assert len(tc.scanned_states) == 1 result = tc.show(dump=True) assert "ReadTransportMessageStatus" in result assert "65282" in result assert "65283" in result assert "ReadMemory" in result assert "subFunctionNotSupported received" in result assert "PR: Supported" in result ids = [t.req.identifier for t in tc.results_with_positive_response] assert 0xff00 in ids assert 0xff02 in ids assert 0xff03 in ids assert 0xffff in ids = UDS_ServiceEnumerator weird issue resps = [EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode=0x13, requestServiceId=0x40)]), EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId=0x41)]), EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId=0x11)]), EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId=0x42)]), EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId=0x43)])] es = [UDS_ServiceEnumerator] scanner = executeScannerInVirtualEnvironment( resps, es, UDS_ServiceEnumerator_kwargs={"scan_range": [0x11, 0x40, 0x41, 0x42]}) assert scanner.scan_completed assert scanner.progress() > 0.95 tc = scanner.configuration.test_cases[0] tc.show() assert len(tc.results_with_negative_response) == 4 = UDS_ServiceEnumerator, all range def req_handler(resp, req): if req.service != 0x22: return False if len(req) == 1: resp.negativeResponseCode="generalReject" return True if len(req) == 2: resp.negativeResponseCode="incorrectMessageLengthOrInvalidFormat" return True if len(req) == 3: resp.negativeResponseCode="requestOutOfRange" return True return False resps = [EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")], req_handler)] es = [UDS_ServiceEnumerator] debug_dissector_backup = conf.debug_dissector # This Enumerator is sending corrupted Packets, therefore we need to disable the debug_dissector conf.debug_dissector = False scanner = executeScannerInVirtualEnvironment( resps, es, UDS_ServiceEnumerator_kwargs={"request_length": 3, "scan_range": range(256)}, unstable_socket=False) conf.debug_dissector = debug_dissector_backup assert scanner.scan_completed assert scanner.progress() > 0.95 tc = scanner.configuration.test_cases[0] assert len(tc.results_without_response) < 10 if tc.results_without_response: tc.show() tc.show() assert len(tc.scanned_states) == 1 result = tc.show(dump=True) assert "incorrectMessageLengthOrInvalidFormat" in result assert "requestOutOfRange" in result + Cleanup = Delete testsockets cleanup_testsockets()