% Regression tests for enumerators ~ linux + Load general modules = Load contribution layer from scapy.contrib.automotive.scanner.enumerator import _AutomotiveTestCaseScanResult, ServiceEnumerator, StateGenerator, StateGeneratingServiceEnumerator from scapy.contrib.automotive.scanner.test_case import TestCaseGenerator, AutomotiveTestCase from scapy.contrib.automotive.scanner.executor import AutomotiveTestCaseExecutor from scapy.contrib.isotp import ISOTP from scapy.contrib.automotive.uds import * from scapy.contrib.automotive.scanner.staged_test_case import StagedAutomotiveTestCase from scapy.utils import SingleConversationSocket from scapy.contrib.automotive.ecu import EcuState, EcuResponse from scapy.contrib.automotive.uds_ecu_states import * import copy + Basic checks = ServiceEnumerator basecls checks pkts = [ _AutomotiveTestCaseScanResult(EcuState(session=1), UDS(b"\x20abcd"), UDS(b"\x60abcd"), 1.0, 1.9), _AutomotiveTestCaseScanResult(EcuState(session=2), UDS(b"\x20abcd"), None, 2.0, None), _AutomotiveTestCaseScanResult(EcuState(session=1), UDS(b"\x21abcd"), UDS(b"\x7fabcd"), 3.0, 3.1), _AutomotiveTestCaseScanResult(EcuState(session=2), UDS(b"\x21abcd"), UDS(b"\x7fa\x10cd"), 4.0, 4.5), ] class MyTestCase(ServiceEnumerator): _supported_kwargs = copy.copy(ServiceEnumerator._supported_kwargs) _supported_kwargs.update({ 'local_kwarg': ((int, str), None), 'verbose': (bool, None), 'global_arg': (str, None) }) def _get_initial_requests(self, **kwargs): # type: (Any) -> Iterable[Packet] return UDS(service=range(1, 11)) def _get_table_entry_y(self, tup): # type: (_AutomotiveTestCaseScanResult) -> str return "0x%02x: %s" % (tup[1].service, tup[1].sprintf("%UDS.service%")) def _get_table_entry_z(self, tup): # type: (_AutomotiveTestCaseScanResult) -> str return self._get_label(tup[2], "PR: Supported") @staticmethod def _get_negative_response_label(response): # type: (Packet) -> str return response.sprintf("NR: %UDS_NR.negativeResponseCode%") @staticmethod def _get_negative_response_code(resp): # type: (Packet) -> int return resp.negativeResponseCode @staticmethod def _get_negative_response_desc(nrc): # type: (int) -> str return UDS_NR(negativeResponseCode=nrc).sprintf( "%UDS_NR.negativeResponseCode%") e = MyTestCase() for p in pkts: p.req.time = p.req_ts p.req.sent_time = p.req_ts if p.resp is not None: p.resp.time = p.resp_ts e._store_result(p.state, p.req, p.resp) = ServiceEnumerator not completed check assert e.completed == False = ServiceEnumerator completed e._state_completed[EcuState(session=1)] = True e._state_completed[EcuState(session=2)] = True assert e.completed = ServiceEnumerator stats check stat_list = e._compute_statistics() stats = {label: value for state, label, value in stat_list if state == "all"} print(stats) assert stats["num_answered"] == '3' assert stats["num_unanswered"] == '1' assert stats["answertime_max"] == '0.9' assert stats["answertime_min"] == '0.1' assert stats["answertime_avg"] == '0.5' assert stats["num_negative_resps"] == '2' = ServiceEnumerator scanned states assert len(e.scanned_states) == 2 assert {EcuState(session=1), EcuState(session=2)} == e.scanned_states = ServiceEnumerator scanned results assert len(e.results_with_positive_response) == 1 assert len(e.results_with_negative_response) == 2 assert len(e.results_without_response) == 1 assert len(e.results_with_response) == 3 = ServiceEnumerator get_label assert e._get_label(pkts[0].resp) == "PR: PositiveResponse" assert e._get_label(pkts[0].resp, lambda _: "positive") == "positive" assert e._get_label(pkts[0].resp, lambda _: "positive" + hex(pkts[0].req.service)) == "positive" + "0x20" assert e._get_label(pkts[1].resp) == "Timeout" assert e._get_label(pkts[2].resp) == "NR: 98" assert e._get_label(pkts[3].resp) == "NR: generalReject" = ServiceEnumerator show e.show(filtered=False) dump = e.show(dump=True, filtered=False) assert "NR: 98" in dump assert "NR: generalReject" in dump assert "PR: Supported" in dump assert "Timeout" in dump assert "session1" in dump assert "session2" in dump assert "0x20" in dump assert "0x21" in dump = ServiceEnumerator filtered results before show print(len(e.filtered_results)) assert len(e.filtered_results) == 2 assert e.filtered_results[0] == pkts[0] assert e.filtered_results[1] == pkts[2] = ServiceEnumerator show filtered e.show(filtered=True) dump = e.show(dump=True, filtered=True) assert "NR: 98" in dump assert "NR: generalReject" in dump assert "PR: Supported" in dump assert "Timeout" not in dump assert "session1" in dump assert "session2" in dump assert "all" in dump assert "0x20" in dump assert "0x21" in dump assert "The following negative response codes are blacklisted: ['serviceNotSupported']" in dump = ServiceEnumerator filtered results after show assert len(e.filtered_results) == 3 assert e.filtered_results[0] == pkts[0] assert e.filtered_results[1] == pkts[2] = ServiceEnumerator supported responses assert len(e.supported_responses) == 3 = ServiceEnumerator evaluate response conf = {} assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), None, **conf) assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x10"), **conf) conf = {"exit_if_service_not_supported": True, "retry_if_busy_returncode": False} assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x21"), **conf) assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x10"), **conf) assert True == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x11"), **conf) assert True == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x7f"), **conf) conf = {"exit_if_service_not_supported": False, "retry_if_busy_returncode": True} assert not e._retry_pkt[EcuState(session=1)] assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x10"), **conf) assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x11"), **conf) assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x7f"), **conf) assert not e._retry_pkt[EcuState(session=1)] assert True == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x21"), **conf) assert e._retry_pkt[EcuState(session=1)] == UDS(b"\x10\x03abcd") assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x21"), **conf) assert not e._retry_pkt[EcuState(session=1)] assert True == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x50\x03\x00"), **conf) assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x11\x03abcd"), UDS(b"\x51\x03\x00"), **conf) conf = {"retry_if_none_received": True} assert True == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), None, **conf) assert e._retry_pkt[EcuState(session=1)] = ServiceEnumerator execute from queue import Queue from scapy.supersocket import SuperSocket class MockISOTPSocket(SuperSocket): nonblocking_socket = True @property def closed(self): return False @closed.setter def closed(self, var): pass def __init__(self, rcvd_queue=None): self.rcvd_queue = Queue() self.sent_queue = Queue() if rcvd_queue is not None: for c in rcvd_queue: self.rcvd_queue.put(c) def recv_raw(self, x=MTU): pkt = bytes(self.rcvd_queue.get(True, 0.01)) return UDS, pkt, 10.0 def send(self, x): sx = raw(x) try: x.sent_time = 9.0 except AttributeError: pass self.sent_queue.put(sx) return len(sx) @staticmethod def select(sockets, remain=None): time.sleep(0) return sockets def sr(self, *args, **kargs): from scapy import sendrecv return sendrecv.sndrcv(self, *args, threaded=False, **kargs) def sr1(self, *args, **kargs): from scapy import sendrecv ans = sendrecv.sndrcv(self, *args, threaded=False, **kargs)[0] # type: SndRcvList if len(ans) > 0: pkt = ans[0][1] # type: Packet return pkt else: return None sock = MockISOTPSocket() sock.rcvd_queue.put(b"\x41") sock.rcvd_queue.put(b"\x42") sock.rcvd_queue.put(b"\x43") sock.rcvd_queue.put(b"\x44") sock.rcvd_queue.put(b"\x45") sock.rcvd_queue.put(b"\x46") sock.rcvd_queue.put(b"\x47") sock.rcvd_queue.put(b"\x48") sock.rcvd_queue.put(b"\x49") sock.rcvd_queue.put(b"\x4A") e = MyTestCase() e.execute(sock, EcuState(session=1)) assert len(e.filtered_results) == 10 assert len(e.results_with_response) == 10 assert len(e.results_without_response) == 0 assert e.has_completed(EcuState(session=1)) assert e.completed e.execute(sock, EcuState(session=2), timeout=0.01) assert len(e.filtered_results) == 10 assert len(e.results_with_response) == 10 assert len(e.results_without_response) == 10 assert e.has_completed(EcuState(session=2)) e.execute(sock, EcuState(session=3), timeout=0.01, exit_if_no_answer_received=True) assert not e.has_completed(EcuState(session=3)) assert not e.completed assert len(e.scanned_states) == 3 e.execute(sock, EcuState(session=42), state_block_list=[EcuState(session=42)]) assert e.has_completed(EcuState(session=42)) assert len(e.scanned_states) == 3 e.execute(sock, EcuState(session=13), state_block_list=EcuState(session=13)) assert e.has_completed(EcuState(session=13)) assert len(e.scanned_states) == 3 e.execute(sock, EcuState(session=41), state_allow_list=[EcuState(session=42)]) assert e.has_completed(EcuState(session=41)) assert len(e.scanned_states) == 3 e.execute(sock, EcuState(session=12), state_allow_list=EcuState(session=13)) assert e.has_completed(EcuState(session=12)) assert len(e.scanned_states) == 3 = Test negative response code service not supported sock.rcvd_queue.put(b"\x7f\x01\x11") sock.rcvd_queue.put(b"\x7f\x01\x7f") e = MyTestCase() e.execute(sock, EcuState(session=1), exit_if_service_not_supported=True) assert not e._retry_pkt[EcuState(session=1)] assert len(e.results_with_response) == 1 assert len(e.results_with_negative_response) == 1 assert e.completed e.execute(sock, EcuState(session=2), exit_if_service_not_supported=True) assert not e._retry_pkt[EcuState(session=2)] assert len(e.results_with_response) == 2 assert len(e.results_with_negative_response) == 2 assert e.completed = Test negative response code retry if busy sock.rcvd_queue.put(b"\x7f\x01\x21") sock.rcvd_queue.put(b"\x7f\x01\x10") e = MyTestCase() e.execute(sock, EcuState(session=1)) assert e._retry_pkt[EcuState(session=1)] assert len(e.results_with_response) == 1 assert len(e.results_with_negative_response) == 1 assert len(e.results_without_response) == 0 assert not e.completed e.execute(sock, EcuState(session=1)) assert not e._retry_pkt[EcuState(session=1)] assert len(e.results_with_response) == 2 assert len(e.results_with_negative_response) == 2 assert len(e.results_without_response) == 9 assert e.completed assert e.has_completed(EcuState(session=1)) = Test negative response code don't retry if busy sock.rcvd_queue.put(b"\x7f\x01\x21") e = MyTestCase() e.execute(sock, EcuState(session=1), retry_if_busy_returncode=False) assert not e._retry_pkt[EcuState(session=1)] assert len(e.results_with_response) == 1 assert len(e.results_with_negative_response) == 1 assert len(e.results_without_response) == 9 assert e.completed assert e.has_completed(EcuState(session=1)) = Test execution time sock.rcvd_queue.put(b"\x7f\x01\x10") e = MyTestCase() e.execute(sock, EcuState(session=1), execution_time=-1) assert not e._retry_pkt[EcuState(session=1)] assert len(e.results_with_response) == 1 assert len(e.results_with_negative_response) == 1 assert len(e.results_without_response) == 0 assert not e.completed assert not e.has_completed(EcuState(session=1)) + AutomotiveTestCaseExecutorConfiguration tests = Definitions class MockSock(object): closed = False def sr1(self, *args, **kwargs): raise OSError class TestCase1(MyTestCase): pass class TestCase2(MyTestCase): pass class Scanner(AutomotiveTestCaseExecutor): @property def default_test_case_clss(self): # type: () -> List[Type[AutomotiveTestCaseABC]] return [MyTestCase] = Basic tests tce = Scanner(MockSock(), test_cases=[TestCase1, TestCase2, MyTestCase], verbose=True, debug=True, global_arg="Whatever", TestCase1_kwargs={"local_kwarg": 42}) config = tce.configuration # type: AutomotiveTestCaseExecutorConfiguration assert config.verbose assert config.debug assert len(config.test_cases) == 3 assert len(config.stages) == 0 assert len(config.staged_test_cases) == 0 assert len(config.test_case_clss) == 3 assert len(config.TestCase1.items()) == 5 assert len(config.TestCase2.items()) == 4 assert len(config["TestCase1"].items()) == 5 assert len(config.MyTestCase.items()) == 4 assert config.TestCase1["verbose"] assert config.TestCase1["debug"] assert config.TestCase1["local_kwarg"] == 42 assert config.TestCase1["global_arg"] == "Whatever" assert config.TestCase2["global_arg"] == "Whatever" assert config.MyTestCase["global_arg"] == "Whatever" assert isinstance(tce.socket, SingleConversationSocket) = Basic tests with default values tce = Scanner(MockSock()) config = tce.configuration # type: AutomotiveTestCaseExecutorConfiguration assert not config.verbose assert not config.debug assert len(config.test_cases) == 1 assert len(config.MyTestCase.items()) == 1 assert isinstance(tce.socket, SingleConversationSocket) = Basic test with stages def connector(testcase1, _): scan_range = len(testcase1.results) return {"verbose": True, "scan_range": scan_range} tc1 = TestCase1() tc2 = TestCase2() pipeline = StagedAutomotiveTestCase([tc1, tc2], [None, connector]) tce = Scanner(MockSock(), test_cases=[pipeline]) config = tce.configuration # type: AutomotiveTestCaseExecutorConfiguration assert not config.verbose assert not config.debug assert len(config.test_cases) == 1 assert len(config.stages) == 1 assert len(config.staged_test_cases) == 2 assert len(config.test_case_clss) == 3 assert len(config.StagedAutomotiveTestCase.items()) == 1 assert isinstance(tce.socket, SingleConversationSocket) = Basic tests with two stages def connector(testcase1, testcase2): scan_range = len(testcase1.results) return {"verbose": True, "scan_range": scan_range} tc1 = TestCase1() tc2 = TestCase2() pipeline = StagedAutomotiveTestCase([tc1, tc2], [None, connector]) class StagedTest(StagedAutomotiveTestCase): pass pipeline2 = StagedTest([MyTestCase(), MyTestCase()]) tce = Scanner(MockSock(), test_cases=[pipeline, pipeline2], verbose=True) config = tce.configuration # type: AutomotiveTestCaseExecutorConfiguration assert config.verbose assert not config.debug assert len(config.test_cases) == 2 assert len(config.stages) == 2 assert len(config.staged_test_cases) == 4 assert len(config.test_case_clss) == 5 assert len(config.StagedAutomotiveTestCase.items()) == 2 assert len(config.StagedTest.items()) == 2 assert len(config.TestCase1.items()) == 2 assert len(config.TestCase2.items()) == 2 assert len(config.MyTestCase.items()) == 2 assert isinstance(tce.socket, SingleConversationSocket) assert len(tce.state_paths) == 1 assert len(tce.final_states) == 1 tce.state_graph.add_edge((tce.final_states[0], EcuState(session=2))) assert len(tce.state_paths) == 2 assert len(tce.final_states) == 2 assert not tce.scan_completed = Reset Handler tests reset_flag = False def reset_func(): global reset_flag reset_flag = True tce = Scanner(MockSock(), reset_handler=reset_func) tce.target_state = EcuState(session=2) tce.reset_target() assert reset_flag assert tce.target_state == EcuState(session=1) = Reset Handler tests 2 tce = Scanner(MockSock()) tce.target_state = EcuState(session=2) tce.reset_target() assert tce.target_state == EcuState(session=1) = Reconnect Handler tests class MockSocket2: closed = False def reconnect_func(): return MockSocket2() tce = Scanner(MockSock(), reconnect_handler=reconnect_func) print(tce.socket) print(repr(tce.socket)) assert isinstance(tce.socket._inner, MockSock) tce.reconnect() assert isinstance(tce.socket._inner, MockSocket2) = Reconnect Handler tests 2 closed = False class MockSocket1: closed = False def close(self): global closed closed = True class MockSocket2: closed = False def reconnect_func(): return MockSocket2() tce = Scanner(MockSocket1(), reconnect_handler=reconnect_func) print(tce.socket) print(repr(tce.socket)) assert isinstance(tce.socket._inner, MockSocket1) tce.reconnect() assert isinstance(tce.socket._inner, MockSocket2) assert closed = TestCase execute pre_exec = False execute = False post_exec = False class TestCase42(MyTestCase): def pre_execute(self, socket, # type: _SocketUnion state, # type: EcuState global_configuration # type: AutomotiveTestCaseExecutorConfiguration # noqa: E501 ): # type: (...) -> None global pre_exec assert state == EcuState(session=1) assert global_configuration.TestCase42["local_kwarg"] == 42 assert global_configuration.TestCase42["verbose"] assert global_configuration.TestCase42["debug"] global_configuration.TestCase42["local_kwarg"] = 1 pre_exec = True def execute(self, socket, state, local_kwarg, verbose, debug, **kwargs): global execute assert verbose assert debug assert local_kwarg == 1 execute = True def post_execute(self, socket, # type: _SocketUnion state, # type: EcuState global_configuration # type: AutomotiveTestCaseExecutorConfiguration # noqa: E501 ): # type: (...) -> None global post_exec assert global_configuration.TestCase42["local_kwarg"] == 1 assert global_configuration.TestCase42["verbose"] assert global_configuration.TestCase42["debug"] post_exec = True tce = Scanner(MockSock(), test_cases=[TestCase42], verbose=True, debug=True, TestCase42_kwargs={"local_kwarg": 42}) tce.execute_test_case(TestCase42()) assert pre_exec == execute == post_exec == True = TestCase execute StateGenerator transition_done = False def transition_func(sock, conf, kwargs): assert kwargs["arg42"] == "hello" assert conf.TestCase43["local_kwarg"] == "world" global transition_done transition_done = True return True class TestCase43(MyTestCase, StateGenerator): def get_new_edge(self, socket, config): assert config.TestCase43["local_kwarg"] == "world" return EcuState(session=1), EcuState(session=2) def get_transition_function(self, socket, edge): assert edge[0] == EcuState(session=1) assert edge[1] == EcuState(session=2) return transition_func, {"arg42": "hello"}, None def execute(self, socket, state, **kwargs): return True tce = Scanner(MockSock(), test_cases=[TestCase43], TestCase43_kwargs={"local_kwarg": "world"}) assert len(tce.final_states) == 1 tce.execute_test_case(TestCase43()) assert len(tce.final_states) == 2 assert EcuState(session=1) in tce.final_states and EcuState(session=2) in tce.final_states assert tce.enter_state(EcuState(session=1), EcuState(session=2)) assert transition_done = TestCase execute StateGenerator no edge class TestCase43(MyTestCase, StateGenerator): def get_new_edge(self, socket, config): assert config.TestCase43["local_kwarg"] == "world" return None def execute(self, socket, state, **kwargs): return True def get_transition_function(self, socket, edge): raise NotImplementedError() tce = Scanner(MockSock(), test_cases=[TestCase43], TestCase43_kwargs={"local_kwarg": "world"}) assert len(tce.final_states) == 1 tce.execute_test_case(TestCase43()) assert len(tce.final_states) == 1 assert EcuState(session=1) in tce.final_states assert not tce.enter_state(EcuState(session=1), EcuState(session=2)) = TestCase execute StateGenerator with cleanupfunc transition_done = False cleanup_done = False def transition_func(sock, conf, kwargs): assert kwargs["arg42"] == "hello" assert conf.TestCase43["local_kwarg"] == "world" global transition_done transition_done = True return True def cleanup_func(sock, conf): assert conf.TestCase43["local_kwarg"] == "world" global cleanup_done cleanup_done = True return True class TestCase43(MyTestCase, StateGenerator): def get_new_edge(self, socket, config): assert config.TestCase43["local_kwarg"] == "world" return EcuState(session=1), EcuState(session=2) def get_transition_function(self, socket, edge): assert edge[0] == EcuState(session=1) assert edge[1] == EcuState(session=2) return transition_func, {"arg42": "hello"}, cleanup_func def execute(self, socket, state, **kwargs): return True tce = Scanner(MockSock(), test_cases=[TestCase43], TestCase43_kwargs={"local_kwarg": "world"}) assert len(tce.final_states) == 1 tce.execute_test_case(TestCase43()) assert len(tce.final_states) == 2 assert EcuState(session=1) in tce.final_states and EcuState(session=2) in tce.final_states assert not len(tce.cleanup_functions) assert tce.enter_state(EcuState(session=1), EcuState(session=2)) assert transition_done assert len(tce.cleanup_functions) tce.cleanup_state() assert not len(tce.cleanup_functions) assert cleanup_done = TestCase execute StateGenerator with not callable cleanupfunc transition_done = False def transition_func(sock, conf, kwargs): assert kwargs["arg42"] == "hello" assert conf.TestCase43["local_kwarg"] == "world" global transition_done transition_done = True return True class TestCase43(MyTestCase, StateGenerator): def get_new_edge(self, socket, config): assert config.TestCase43["local_kwarg"] == "world" return EcuState(session=1), EcuState(session=2) def get_transition_function(self, socket, edge): assert edge[0] == EcuState(session=1) assert edge[1] == EcuState(session=2) return transition_func, {"arg42": "hello"}, "fake" def execute(self, socket, state, **kwargs): return True tce = Scanner(MockSock(), test_cases=[TestCase43], TestCase43_kwargs={"local_kwarg": "world"}) assert len(tce.final_states) == 1 tce.execute_test_case(TestCase43()) assert len(tce.final_states) == 2 assert EcuState(session=1) in tce.final_states and EcuState(session=2) in tce.final_states assert not len(tce.cleanup_functions) assert tce.enter_state(EcuState(session=1), EcuState(session=2)) assert transition_done assert len(tce.cleanup_functions) tce.cleanup_state() assert not len(tce.cleanup_functions) = TestCase execute StateGenerator with cleanupfunc negative return transition_done = False cleanup_done = False def transition_func(sock, conf, kwargs): assert kwargs["arg42"] == "hello" assert conf.TestCase43["local_kwarg"] == "world" global transition_done transition_done = True return True def cleanup_func(sock, conf): assert conf.TestCase43["local_kwarg"] == "world" global cleanup_done cleanup_done = True return False class TestCase43(MyTestCase, StateGenerator): def get_new_edge(self, socket, config): assert config.TestCase43["local_kwarg"] == "world" return EcuState(session=1), EcuState(session=2) def get_transition_function(self, socket, edge): assert edge[0] == EcuState(session=1) assert edge[1] == EcuState(session=2) return transition_func, {"arg42": "hello"}, cleanup_func def execute(self, socket, state, **kwargs): return True tce = Scanner(MockSock(), test_cases=[TestCase43], TestCase43_kwargs={"local_kwarg": "world"}) assert len(tce.final_states) == 1 tce.execute_test_case(TestCase43()) assert len(tce.final_states) == 2 assert EcuState(session=1) in tce.final_states and EcuState(session=2) in tce.final_states assert not len(tce.cleanup_functions) assert tce.enter_state(EcuState(session=1), EcuState(session=2)) assert transition_done assert len(tce.cleanup_functions) tce.cleanup_state() assert not len(tce.cleanup_functions) assert cleanup_done = TestCase execute StateGenerator with cleanupfunc and path transition_done1 = False cleanup_done1 = False transition_done2 = False cleanup_done2 = False transition_error = False def transition_func1(sock, conf, kwargs): global transition_done1 transition_done1 = True return True def cleanup_func1(sock, conf): global cleanup_done1 cleanup_done1 = True return True def transition_func2(sock, conf, kwargs): global transition_done2 transition_done2 = True return not transition_error def cleanup_func2(sock, conf): global cleanup_done2 cleanup_done2 = True return True class TestCase43(MyTestCase, StateGenerator): def get_new_edge(self, socket, config): return EcuState(session=1), EcuState(session=2) def get_transition_function(self, socket, edge): return transition_func1, {"arg42": "hello"}, cleanup_func1 def execute(self, socket, state, **kwargs): return True class TestCase44(MyTestCase, StateGenerator): def get_new_edge(self, socket, config): return EcuState(session=2), EcuState(session=3) def get_transition_function(self, socket, edge): return transition_func2, None, cleanup_func2 def execute(self, socket, state, **kwargs): return True reset_done = False def reset_func(): global reset_done reset_done = True reconnect_done = False def reconnect_func(): global reconnect_done reconnect_done = True return MockSock() tce = Scanner(MockSock(), test_cases=[TestCase43, TestCase44], reset_handler=reset_func, reconnect_handler=reconnect_func) assert len(tce.final_states) == 1 tce.execute_test_case(TestCase43()) assert len(tce.final_states) == 2 assert EcuState(session=1) in tce.final_states and EcuState(session=2) in tce.final_states tce.execute_test_case(TestCase44()) assert len(tce.final_states) == 3 assert EcuState(session=3) in tce.final_states and EcuState(session=2) in tce.final_states assert not len(tce.cleanup_functions) assert tce.enter_state_path([EcuState(session=1), EcuState(session=2), EcuState(session=3)]) assert transition_done1 assert transition_done2 assert len(tce.cleanup_functions) == 2 assert reconnect_done assert reset_done tce.cleanup_state() assert cleanup_done1 assert cleanup_done2 try: tce.enter_state_path([EcuState(session=3)]) assert False except Scapy_Exception: assert True = Test downrate edge transition_done1 = False cleanup_done1 = False tce = Scanner(MockSock(), test_cases=[TestCase43, TestCase44], reset_handler=reset_func, reconnect_handler=reconnect_func) assert len(tce.final_states) == 1 tce.execute_test_case(TestCase43()) assert len(tce.final_states) == 2 assert EcuState(session=1) in tce.final_states and EcuState(session=2) in tce.final_states tce.execute_test_case(TestCase44()) assert len(tce.final_states) == 3 assert EcuState(session=3) in tce.final_states and EcuState(session=2) in tce.final_states assert not len(tce.cleanup_functions) transition_error = True assert not tce.enter_state_path([EcuState(session=1), EcuState(session=2), EcuState(session=3)]) assert transition_done1 assert cleanup_done1 assert len(tce.cleanup_functions) == 0 assert tce.state_graph.weights[(EcuState(session=1), EcuState(session=2))] == 1 assert tce.state_graph.weights[(EcuState(session=2), EcuState(session=3))] == 2 = TestCase execute TestCaseGenerator tc_executed = False class GeneratedTestCase(MyTestCase): def execute(self, socket, state, **kwargs): assert kwargs["local_kwarg"] == "world" global tc_executed tc_executed = True return True class TestCase43(MyTestCase, TestCaseGenerator): def execute(self, socket, state, **kwargs): return True def get_generated_test_case(self): return GeneratedTestCase() tce = Scanner(MockSock(), test_cases=[TestCase43], GeneratedTestCase_kwargs={"local_kwarg": "world"}) assert len(tce.final_states) == 1 assert len(tce.configuration.test_cases) == 1 tce.execute_test_case(tce.configuration.test_cases[0]) assert len(tce.configuration.test_cases) == 2 tce.execute_test_case(tce.configuration.test_cases[1]) assert tc_executed = TestCase scan timeout tc_executed = False class GeneratedTestCase(MyTestCase): def execute(self, socket, state, **kwargs): assert kwargs["local_kwarg"] == "world" global tc_executed tc_executed = True return True tce = Scanner(MockSock(), test_cases=[GeneratedTestCase], GeneratedTestCase_kwargs={"local_kwarg": "world"}) assert len(tce.final_states) == 1 assert len(tce.configuration.test_cases) == 1 tce.scan(-1) assert not tc_executed = TestCase scan tc_executed = False class GeneratedTestCase(MyTestCase): def execute(self, socket, state, **kwargs): assert kwargs["local_kwarg"] == "world" global tc_executed tc_executed = True self._state_completed[state] = True return True class TestCase43(MyTestCase, TestCaseGenerator): def execute(self, socket, state, **kwargs): self._state_completed[state] = True return True def get_generated_test_case(self): return GeneratedTestCase() tce = Scanner(MockSock(), test_cases=[TestCase43], GeneratedTestCase_kwargs={"local_kwarg": "world"}) assert len(tce.final_states) == 1 assert len(tce.configuration.test_cases) == 1 tce.scan() assert len(tce.configuration.test_cases) == 2 assert tc_executed assert tce.scan_completed = Test supported responses class MyTestCase1(AutomotiveTestCase): _description = "MyTestCase1" _supported_kwargs = copy.copy(AutomotiveTestCase._supported_kwargs) _supported_kwargs.update({ 'stop_event': (threading.Event, None), # type: ignore }) @property def supported_responses(self): return [EcuResponse(EcuState(session=2), responses=UDS() / UDS_RDBIPR(dataIdentifier=2) / Raw(b"de")), EcuResponse([EcuState(session=2), EcuState(security_level=6)], responses=UDS() / UDS_RDBIPR(dataIdentifier=3) / Raw(b"dea2")), EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x13))] class MyTestCase2(AutomotiveTestCase): _description = "MyTestCase2" _supported_kwargs = copy.copy(AutomotiveTestCase._supported_kwargs) _supported_kwargs.update({ 'stop_event': (threading.Event, None), # type: ignore }) @property def supported_responses(self): return [EcuResponse(EcuState(session=2), responses=UDS() / UDS_RDBIPR(dataIdentifier=5) / Raw(b"deadbeef1")), EcuResponse(EcuState(session=2), responses=UDS() / UDS_RDBIPR(dataIdentifier=6) / Raw(b"deadbeef2")), EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x10, requestServiceId=0x11))] tce = Scanner(MockSock(), test_cases=[MyTestCase1(), MyTestCase2()]) resps = tce.supported_responses assert len(resps) == 6 assert resps[0].responses[0].service != 0x7f assert resps[1].responses[0].service != 0x7f assert resps[2].responses[0].service != 0x7f assert resps[3].responses[0].service != 0x7f assert resps[4].responses[0].service == 0x7f assert resps[5].responses[0].service == 0x7f assert resps[0].responses[0].load == b"dea2" assert resps[1].responses[0].load == b"deadbeef1" assert resps[2].responses[0].load == b"deadbeef2" assert resps[3].responses[0].load == b"de" assert resps[4].responses[0].requestServiceId == 0x13 assert resps[5].responses[0].requestServiceId == 0x11 = Test show testcases try: tce.show_testcases() assert True except Exception: assert False try: tce.show_testcases_status() assert True except Exception: assert False = Test StateGeneratingServiceEnumerator class TestCase43(MyTestCase, StateGeneratingServiceEnumerator): def execute(self, socket, state, **kwargs): return True @property def results(self): # type: () -> List[_AutomotiveTestCaseScanResult] return [_AutomotiveTestCaseScanResult(EcuState(session=1), UDS()/UDS_DSC(b"\x03"), UDS()/UDS_DSCPR(b"\x03"), 1.1, 1.2)] tce = Scanner(MockSock(), test_cases=[TestCase43]) assert len(tce.final_states) == 1 tce.execute_test_case(TestCase43()) assert len(tce.final_states) == 2 assert EcuState(session=1) in tce.final_states and EcuState(session=3) in tce.final_states tf, args, cf = tce.state_graph.get_transition_tuple_for_edge((EcuState(session=1), EcuState(session=3))) assert cf is None assert tf is not None assert len(args) == 2 assert args["req"] == UDS()/UDS_DSC(b"\x03") assert "diagnosticSessionType" in args["desc"] and "extendedDiagnosticSession" in args["desc"] assert not tce.enter_state(EcuState(session=1), EcuState(session=3))