% Regression tests for the Ecu utility # More information at http://www.secdev.org/projects/UTscapy/ ############ ############ + Setup ~ conf command = Load modules import copy import itertools load_contrib("isotp", globals_dict=globals()) load_contrib("automotive.uds", globals_dict=globals()) load_contrib("automotive.gm.gmlan", globals_dict=globals()) load_layer("can", globals_dict=globals()) conf.contribs["CAN"]["swap-bytes"] = True = Load Ecu module load_contrib("automotive.ecu", globals_dict=globals()) from scapy.contrib.automotive.uds_ecu_states import * from scapy.contrib.automotive.uds_logging import * from scapy.contrib.automotive.gm.gmlan_ecu_states import * from scapy.contrib.automotive.gm.gmlan_logging import * + EcuState Basic checks = Check EcuState basic functionality state = EcuState() state["session"] = 2 state["securityAccess"] = 4 print(repr(state)) assert repr(state) == "securityAccess4session2" = More complex tests state = EcuState(ses=4) assert state.ses == 4 state.ses = 5 assert state.ses == 5 = Even more complex tests state = EcuState(myinfo="42") state.ses = 5 assert state.ses == 5 state["ses"] = None assert state.ses is None state.ses = 5 assert 5 == state.ses assert "42" == state.myinfo assert repr(state) == "myinfo42ses5" = Delete Attribute Test state = EcuState(myinfo="42") state.ses = 5 assert state.ses == 5 del state.ses try: x = state.ses assert False except (KeyError, AttributeError): assert state.myinfo == "42" = Copy tests state = EcuState(myinfo="42") state.ses = 5 ns = copy.copy(state) ns.ses = 6 assert ns.ses == 6 assert state.ses == 5 assert ns.myinfo == "42" = Move tests state = EcuState(myinfo="42") state.ses = 5 ns = state ns.ses = 6 assert ns.ses == 6 assert state.ses == 6 assert ns.myinfo == "42" = equal tests state = EcuState(myinfo="42") state.ses = 5 ns = copy.copy(state) assert state == ns assert hash(state) == hash(ns) ns.ses = 6 assert state != ns assert hash(state) != hash(ns) ns.ses = 5 assert state == ns assert hash(state) == hash(ns) ns.sa = 5 assert state != ns assert hash(state) != hash(ns) = hash tests state = EcuState(myinfo="42") state.ses = 5 ns = copy.copy(state) assert hash(state) == hash(ns) ns.ses = 6 assert hash(state) != hash(ns) ns.ses = 5 assert hash(state) == hash(ns) ns.sa = 5 assert hash(state) != hash(ns) = command tests state = EcuState(myinfo="42") state.ses = 5 state.command() assert "EcuState(myinfo='42', ses=5)" == state.command() = less than tests s1 = EcuState() s2 = EcuState() s1.a = 1 s2.a = 2 assert s1 < s2 s1.b = 4 assert s1 > s2 s2.b = 1 assert s1 < s2 s1.a = 2 assert s1 > s2 = less than tests 2 s1 = EcuState() s2 = EcuState() s1.c = "x" s2.c = 4 exception = False try: assert s1 < s2 except TypeError: exception = True assert exception = less than tests 3 s1 = EcuState() s2 = EcuState() s1.A = 1 s1.a = 2 s2.A = 2 s2.a = 1 assert s1 < s2 = less than tests 4 s1 = EcuState() s2 = EcuState() s1.A = 1 s1.a = 2 s2.A = 2 s2.b = 100 assert s1 < s2 = less than tests 5 s1 = EcuState() s2 = EcuState() s1.A = 100 s1.a = 2 s2.A = 2 s2.b = 100 assert s1 > s2 assert not s1 > s1 assert not s1 < s1 = less than tests 6 s1 = EcuState() s2 = EcuState() s1.A = 100 s1.B = 200 s2.a = 2 s2.b = 1 assert s1 < s2 = contains test s1 = EcuState(ses=[1,2,3]) s2 = EcuState(ses=1) assert s1 != s2 assert s2 in s1 assert s1 not in s2 s1 = EcuState(ses=[2,3]) s2 = EcuState(ses=1) assert s1 != s2 assert s2 not in s1 assert s1 not in s2 s1 = EcuState(ses=[1,2,3], security=5) s2 = EcuState(ses=1) assert s1 != s2 assert s2 not in s1 assert s1 not in s2 s1 = EcuState(ses=range(4), security=[None, 5]) s2 = EcuState(ses=1) assert s1 != s2 assert s2 in s1 assert s1 not in s2 s1 = EcuState(ses=range(4), security=[None, 5]) s2 = EcuState(ses=range(2)) assert s1 != s2 assert s2 < s1 assert s2 in s1 assert s1 not in s2 s1 = EcuState(ses=range(4), security=[None, 5]) s2 = EcuState(ses=range(2), security=5) assert s1 != s2 assert s2 in s1 assert s1 not in s2 s1 = EcuState(ses=range(4), security=[None, 5]) s2 = EcuState(ses=range(5)) assert s1 != s2 assert s2 not in s1 assert s1 not in s2 s1 = EcuState(ses=range(4), security=[None, range(5)]) s2 = EcuState(ses=3) print(s1._expand()) print(s2._expand()) assert s1 != s2 assert s2 in s1 assert s1 not in s2 s1 = EcuState(ses=range(4), security=[None, range(5), [5, 7, range(4), [range(10), 10]]]) s2 = EcuState(ses=3, security=10) print(s1._expand()) print(s2._expand()) assert s1 != s2 assert s2 in s1 assert s1 not in s2 s1 = EcuState(ses=range(4), security=[None, range(5), [5, 7, range(4), [range(10), "B"]]]) s2 = EcuState(ses=3, security="B") print(s1._expand()) print(s2._expand()) assert s1 != s2 assert s2 in s1 assert s1 not in s2 s1 = EcuState(ses=range(4), security=[None, range(5), [5, 7, range(4), [range(10), "B"]]]) s2 = EcuState(ses=3, security="C") print(s1._expand()) print(s2._expand()) assert s1 != s2 assert s2 not in s1 assert s1 not in s2 s1 = EcuState(ses=range(3), security=5) s2 = EcuState(ses=1, security=5) assert s1 != s2 assert s2 in s1 assert s1 not in s2 s1 = EcuState(ses=range(3), security=(x for x in range(1, 10, 2))) s2 = EcuState(ses=1, security=5) assert s1 != s2 assert s2 in s1 assert s1 not in s2 s1 = EcuState(ses=[1,2,3]) s2 = EcuState(ses=[1,2,3]) assert s1 in s2 assert s2 in s1 assert s1 == s2 s1 = EcuState(ses=1) s2 = EcuState(ses=1) assert s1 in s2 assert s2 in s1 assert s1 == s2 s1 = EcuState(ses=range(3), security=range(5)) for ses, sec in itertools.product(range(3), range(5)): s2 = EcuState(ses=ses, security=sec) assert s1 != s2 assert s2 in s1 assert s1 not in s2 s1 = EcuState(ses=[0, 1, 2], security=[43, 44]) for ses, sec in itertools.product(range(3), range(43, 45)): s2 = EcuState(ses=ses, security=sec) assert s1 != s2 assert s2 in s1 assert s1 not in s2 s1 = EcuState(ses=[0, 1, 2], security=["a", "b"]) for ses, sec in itertools.product(range(3), (x for x in "ab")): s2 = EcuState(ses=ses, security=sec) assert s1 != s2 assert s2 in s1 try: assert s1 not in s2 except TypeError: assert True s1 = [EcuState(ses=1), EcuState(ses=2), EcuState(ses=3)] s2 = EcuState(ses=3) assert s2 in s1 assert s1 not in s2 s1 = EcuState(ses=1, sa="SEC") s2 = EcuState(ses=1, sa="SOC") assert s1 not in s2 assert s2 not in s1 assert s1 != s2 s1 = EcuState(ses=1, sa="SEC") s2 = EcuState(ses=1, sa="SEC") assert s1 in s2 assert s2 in s1 assert s1 == s2 s1 = EcuState(ses=1, sa="SEC") s2 = EcuState(ses=1, sa=["SEC", "SOL"]) assert s1 in s2 assert s2 not in s1 assert s1 != s2 s1 = EcuState(ses=1, sa=b"SEC") s2 = EcuState(ses=1, sa=[b"SEC", "SOL"]) assert s1 in s2 assert s2 not in s1 assert s1 != s2 + EcuState modification tests = Basic definitions for tests class myPack1(Packet): fields_desc = [ IntField("fakefield", 1) ] class myPack2(Packet): fields_desc = [ IntField("statefield", 1) ] @EcuState.extend_pkt_with_modifier(myPack2) def modify_ecu_state(self, req, ecustate): # type: (Packet, Packet, EcuState) -> None ecustate.state = self.statefield pkt = myPack1()/myPack2() st = EcuState() exception = False try: assert st.state == 1 except AttributeError: exception = True assert exception == True assert EcuState.is_modifier_pkt(pkt) assert not EcuState.is_modifier_pkt(myPack1()) mod = EcuState.get_modified_ecu_state(pkt, Raw(), st) assert mod != st assert mod.state ==1 pkt2 = myPack1()/myPack1()/myPack1()/myPack2(statefield=5) mod2 = EcuState.get_modified_ecu_state(pkt2, Raw(), mod) assert mod != mod2 assert mod < mod2 pkt2 = myPack1()/myPack1()/myPack1()/myPack2(statefield=4)/myPack2(statefield=5) mod2 = EcuState.get_modified_ecu_state(pkt2, Raw(), mod) mod.state = 5 assert mod != mod2 assert mod > mod2 + EcuResponse tests = Basic checks resp = EcuResponse(EcuState(session=1), UDS()/UDS_DSCPR(b"\x03")) assert not resp.supports_state(EcuState()) assert not resp.supports_state(EcuState(session=2)) assert resp.supports_state(EcuState(session=1)) assert resp.answers(UDS()/UDS_DSC(b"\x03")) = Command checks resp = EcuResponse(EcuState(session=1), UDS()/UDS_DSCPR(b"\x03")) cmd = resp.command() print(cmd) resp1 = eval(cmd) assert resp1 == resp = Command checks 2 p1 = UDS(bytes(UDS()/UDS_NR(b"\x10\x00"))) p2 = UDS(bytes(UDS()/UDS_DSCPR(b"\x03"))) resp = EcuResponse([EcuState(session=1), EcuState(session=3)], [p1, p2]) cmd = resp.command() print(cmd) resp1 = eval(cmd) assert any(resp1.supports_state(s) for s in resp.states) assert any(resp.supports_state(s) for s in resp1.states) assert len(resp.responses) == len(resp1.responses) assert all(bytes(x) == bytes(y) for x, y in zip(resp.responses, resp1.responses)) assert resp1 == resp = Compare check p1 = UDS(bytes(UDS()/UDS_NR(b"\x10\x00"))) p2 = UDS(bytes(UDS()/UDS_DSCPR(b"\x03"))) resp = EcuResponse([EcuState(session=1), EcuState(session=3)], [p1, p2]) resp1 = EcuResponse([EcuState(session=1)], [p1, p2]) resp2 = EcuResponse([EcuState(session=2)], [p1, p2]) resp3 = EcuResponse([EcuState(session=1)], [p2]) assert resp == resp1 assert resp != resp2 assert resp != resp3 = Key response check req = UDS()/UDS_DSC(b"\x03") p1 = UDS(bytes(UDS()/UDS_NR(b"\x10\x00"))) p2 = UDS(bytes(UDS()/UDS_DSCPR(b"\x03"))) resp = EcuResponse([EcuState(session=1), EcuState(session=3)], [p1, p2]) assert resp.answers(req) assert resp.key_response.answers(req) + Ecu Simple operations = Log all commands applied to an Ecu msgs = [UDS(service=16) / UDS_DSC(diagnosticSessionType=3), UDS(service=16) / UDS_DSC(diagnosticSessionType=4), UDS(service=16) / UDS_DSC(diagnosticSessionType=5), UDS(service=16) / UDS_DSC(diagnosticSessionType=6), UDS(service=16) / UDS_DSC(diagnosticSessionType=2)] ecu = Ecu(verbose=False, store_supported_responses=False) ecu.update(PacketList(msgs)) assert len(ecu.log["DiagnosticSessionControl"]) == 5 timestamp, value = ecu.log["DiagnosticSessionControl"][0] assert timestamp > 0 assert value == "extendedDiagnosticSession" assert ecu.log["DiagnosticSessionControl"][-1][1] == "programmingSession" = Trace all commands applied to an Ecu msgs = [UDS(service=16) / UDS_DSC(diagnosticSessionType=3), UDS(service=80) / UDS_DSCPR(diagnosticSessionType=3, sessionParameterRecord=b'\\x002\\x01\\xf4')] ecu = Ecu(verbose=True, logging=False, store_supported_responses=False) ecu.update(PacketList(msgs)) assert ecu.state.session == 3 = Generate supported responses of an Ecu msgs = [UDS(service=16) / UDS_DSC(diagnosticSessionType=3), UDS(service=80) / UDS_DSCPR(diagnosticSessionType=3, sessionParameterRecord=b'\\x002\\x01\\xf4'), UDS(service=16) / UDS_DSC(diagnosticSessionType=4)] ecu = Ecu(verbose=False, logging=False, store_supported_responses=True) ecu.update(PacketList(msgs)) supported_responses = ecu.supported_responses unanswered_packets = ecu.unanswered_packets assert ecu.state.session == 3 assert len(supported_responses) == 1 assert len(unanswered_packets) == 1 response = supported_responses[0] print(response.command()) assert response.supports_state(EcuState()) assert response.key_response.service == 80 assert unanswered_packets[0].diagnosticSessionType == 4 + Ecu Advanced checks = Analyze multiple UDS messages udsmsgs = sniff(offline=scapy_path("test/pcaps/ecu_trace.pcap.gz"), session=ISOTPSession(use_ext_address=False, basecls=UDS), count=50, timeout=3) assert len(udsmsgs) == 50 ecu = Ecu() ecu.update(udsmsgs) response = ecu.supported_responses[0] assert response.supports_state(EcuState()) assert response.key_response.service == 80 assert response.key_response.diagnosticSessionType == 3 response = ecu.supported_responses[1] assert response.supports_state(EcuState(session=3)) assert response.key_response.service == 80 assert response.key_response.diagnosticSessionType == 2 response = ecu.supported_responses[4] print(response) state = EcuState(session=2, security_level=18) print(state) assert response.supports_state(state) assert response.key_response.service == 110 assert response.key_response.dataIdentifier == 61786 assert len(ecu.log["TransferData"]) == 2 + EcuSession tests = Analyze on the fly with EcuSession session = EcuSession() with PcapReader(scapy_path("test/pcaps/ecu_trace.pcap.gz")) as sock: udsmsgs = sniff(session=ISOTPSession(supersession=session, use_ext_address=False, basecls=UDS), count=50, opened_socket=sock, timeout=3) assert len(udsmsgs) == 50 ecu = session.ecu response = ecu.supported_responses[0] assert response.supports_state(EcuState()) assert response.key_response.service == 80 assert response.key_response.diagnosticSessionType == 3 response = ecu.supported_responses[1] assert response.supports_state(EcuState(session=3)) assert response.key_response.service == 80 assert response.key_response.diagnosticSessionType == 2 response = ecu.supported_responses[4] print(response) state = EcuState(session=2, security_level=18) print(state) assert response.supports_state(state) assert response.key_response.service == 110 assert response.key_response.dataIdentifier == 61786 assert len(ecu.log["TransferData"]) == 2 = Analyze on the fly with EcuSession GMLAN1 session = EcuSession() conf.contribs['CAN']['swap-bytes'] = True with PcapReader(scapy_path("test/pcaps/gmlan_trace.pcap.gz")) as sock: gmlanmsgs = sniff(session=ISOTPSession(supersession=session, rx_id=[0x241, 0x641, 0x101], basecls=GMLAN), count=2, opened_socket=sock, timeout=3) ecu = session.ecu print("Check 1 after change to diagnostic mode") assert len(ecu.supported_responses) == 1 assert ecu.state == EcuState(session=3) gmlanmsgs = sniff(session=ISOTPSession(supersession=session, rx_id=[0x241, 0x641, 0x101], basecls=GMLAN), count=6, opened_socket=sock) ecu = session.ecu print("Check 2 after some more messages were read1") assert len(ecu.supported_responses) == 3 print("Check 2 after some more messages were read2") assert ecu.state.session == 3 print("assert 1") assert ecu.state.communication_control == 1 gmlanmsgs = sniff(session=ISOTPSession(supersession=session, rx_id=[0x241, 0x641, 0x101], basecls=GMLAN), count=2, opened_socket=sock) ecu = session.ecu print("Check 3 after change to programming mode (bootloader)") assert len(ecu.supported_responses) == 4 assert ecu.state.session == 2 assert ecu.state.communication_control == 1 gmlanmsgs = sniff(session=ISOTPSession(supersession=session, rx_id=[0x241, 0x641, 0x101], basecls=GMLAN), count=6, opened_socket=sock) ecu = session.ecu print("Check 4 after gaining security access") assert len(ecu.supported_responses) == 6 assert ecu.state.session == 2 assert ecu.state.security_level == 2 assert ecu.state.communication_control == 1 = Analyze on the fly with EcuSession GMLAN logging test session = EcuSession(verbose=False, store_supported_responses=False) conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 conf.contribs['CAN']['swap-bytes'] = True conf.debug_dissector = True gmlanmsgs = sniff(offline=scapy_path("test/pcaps/gmlan_trace.pcap.gz"), session=ISOTPSession(supersession=session, rx_id=[0x241, 0x641, 0x101], basecls=GMLAN), count=200, timeout=6) ecu = session.ecu assert len(ecu.supported_responses) == 0 assert len([m for m in gmlanmsgs if m.sprintf("%GMLAN.service%") == "TransferData"]) == len(ecu.log["TransferData"]) assert len([m for m in gmlanmsgs if m.sprintf("%GMLAN.service%") == "RequestDownload"]) == len(ecu.log["RequestDownload"]) assert len([m for m in gmlanmsgs if m.sprintf("%GMLAN.service%") == "ReadDataByIdentifier"]) == len(ecu.log["ReadDataByIdentifier"]) assert len(ecu.log["SecurityAccess"]) == 2 assert len(ecu.log["SecurityAccessPositiveResponse"]) == 2 assert ecu.log["TransferData"][-1][1][0] == "downloadAndExecuteOrExecute"