1% Regression tests for enumerators 2~ linux 3 4+ Load general modules 5 6= Load contribution layer 7 8from scapy.contrib.automotive.scanner.enumerator import _AutomotiveTestCaseScanResult, ServiceEnumerator, StateGenerator, StateGeneratingServiceEnumerator 9from scapy.contrib.automotive.scanner.test_case import TestCaseGenerator, AutomotiveTestCase 10from scapy.contrib.automotive.scanner.executor import AutomotiveTestCaseExecutor 11from scapy.contrib.isotp import ISOTP 12from scapy.contrib.automotive.uds import * 13from scapy.contrib.automotive.scanner.staged_test_case import StagedAutomotiveTestCase 14from scapy.utils import SingleConversationSocket 15from scapy.contrib.automotive.ecu import EcuState, EcuResponse 16from scapy.contrib.automotive.uds_ecu_states import * 17import copy 18 19+ Basic checks 20= ServiceEnumerator basecls checks 21 22pkts = [ 23 _AutomotiveTestCaseScanResult(EcuState(session=1), UDS(b"\x20abcd"), UDS(b"\x60abcd"), 1.0, 1.9), 24 _AutomotiveTestCaseScanResult(EcuState(session=2), UDS(b"\x20abcd"), None, 2.0, None), 25 _AutomotiveTestCaseScanResult(EcuState(session=1), UDS(b"\x21abcd"), UDS(b"\x7fabcd"), 3.0, 3.1), 26 _AutomotiveTestCaseScanResult(EcuState(session=2), UDS(b"\x21abcd"), UDS(b"\x7fa\x10cd"), 4.0, 4.5), 27] 28 29class MyTestCase(ServiceEnumerator): 30 _supported_kwargs = copy.copy(ServiceEnumerator._supported_kwargs) 31 _supported_kwargs.update({ 32 'local_kwarg': ((int, str), None), 33 'verbose': (bool, None), 34 'global_arg': (str, None) 35 }) 36 def _get_initial_requests(self, **kwargs): 37 # type: (Any) -> Iterable[Packet] 38 return UDS(service=range(1, 11)) 39 def _get_table_entry_y(self, tup): 40 # type: (_AutomotiveTestCaseScanResult) -> str 41 return "0x%02x: %s" % (tup[1].service, tup[1].sprintf("%UDS.service%")) 42 def _get_table_entry_z(self, tup): 43 # type: (_AutomotiveTestCaseScanResult) -> str 44 return self._get_label(tup[2], "PR: Supported") 45 @staticmethod 46 def _get_negative_response_label(response): 47 # type: (Packet) -> str 48 return response.sprintf("NR: %UDS_NR.negativeResponseCode%") 49 @staticmethod 50 def _get_negative_response_code(resp): 51 # type: (Packet) -> int 52 return resp.negativeResponseCode 53 @staticmethod 54 def _get_negative_response_desc(nrc): 55 # type: (int) -> str 56 return UDS_NR(negativeResponseCode=nrc).sprintf( 57 "%UDS_NR.negativeResponseCode%") 58 59 60e = MyTestCase() 61for p in pkts: 62 p.req.time = p.req_ts 63 p.req.sent_time = p.req_ts 64 if p.resp is not None: 65 p.resp.time = p.resp_ts 66 e._store_result(p.state, p.req, p.resp) 67 68 69= ServiceEnumerator not completed check 70 71assert e.completed == False 72 73= ServiceEnumerator completed 74 75e._state_completed[EcuState(session=1)] = True 76e._state_completed[EcuState(session=2)] = True 77 78assert e.completed 79 80= ServiceEnumerator stats check 81 82stat_list = e._compute_statistics() 83 84stats = {label: value for state, label, value in stat_list if state == "all"} 85print(stats) 86 87assert stats["num_answered"] == '3' 88assert stats["num_unanswered"] == '1' 89assert stats["answertime_max"] == '0.9' 90assert stats["answertime_min"] == '0.1' 91assert stats["answertime_avg"] == '0.5' 92assert stats["num_negative_resps"] == '2' 93 94= ServiceEnumerator scanned states 95 96assert len(e.scanned_states) == 2 97assert {EcuState(session=1), EcuState(session=2)} == e.scanned_states 98 99= ServiceEnumerator scanned results 100 101assert len(e.results_with_positive_response) == 1 102assert len(e.results_with_negative_response) == 2 103assert len(e.results_without_response) == 1 104assert len(e.results_with_response) == 3 105 106= ServiceEnumerator get_label 107assert e._get_label(pkts[0].resp) == "PR: PositiveResponse" 108assert e._get_label(pkts[0].resp, lambda _: "positive") == "positive" 109assert e._get_label(pkts[0].resp, lambda _: "positive" + hex(pkts[0].req.service)) == "positive" + "0x20" 110assert e._get_label(pkts[1].resp) == "Timeout" 111assert e._get_label(pkts[2].resp) == "NR: 98" 112assert e._get_label(pkts[3].resp) == "NR: generalReject" 113 114= ServiceEnumerator show 115 116e.show(filtered=False) 117 118dump = e.show(dump=True, filtered=False) 119assert "NR: 98" in dump 120assert "NR: generalReject" in dump 121assert "PR: Supported" in dump 122assert "Timeout" in dump 123assert "session1" in dump 124assert "session2" in dump 125assert "0x20" in dump 126assert "0x21" in dump 127 128= ServiceEnumerator filtered results before show 129 130print(len(e.filtered_results)) 131assert len(e.filtered_results) == 2 132assert e.filtered_results[0] == pkts[0] 133assert e.filtered_results[1] == pkts[2] 134 135= ServiceEnumerator show filtered 136 137e.show(filtered=True) 138 139dump = e.show(dump=True, filtered=True) 140assert "NR: 98" in dump 141assert "NR: generalReject" in dump 142assert "PR: Supported" in dump 143assert "Timeout" not in dump 144assert "session1" in dump 145assert "session2" in dump 146assert "all" in dump 147assert "0x20" in dump 148assert "0x21" in dump 149assert "The following negative response codes are blacklisted: ['serviceNotSupported']" in dump 150 151= ServiceEnumerator filtered results after show 152 153assert len(e.filtered_results) == 3 154assert e.filtered_results[0] == pkts[0] 155assert e.filtered_results[1] == pkts[2] 156 157= ServiceEnumerator supported responses 158 159assert len(e.supported_responses) == 3 160 161= ServiceEnumerator evaluate response 162 163conf = {} 164 165assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), None, **conf) 166assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x10"), **conf) 167conf = {"exit_if_service_not_supported": True, "retry_if_busy_returncode": False} 168assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x21"), **conf) 169assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x10"), **conf) 170assert True == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x11"), **conf) 171assert True == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x7f"), **conf) 172conf = {"exit_if_service_not_supported": False, "retry_if_busy_returncode": True} 173assert not e._retry_pkt[EcuState(session=1)] 174assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x10"), **conf) 175assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x11"), **conf) 176assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x7f"), **conf) 177assert not e._retry_pkt[EcuState(session=1)] 178assert True == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x21"), **conf) 179assert e._retry_pkt[EcuState(session=1)] == UDS(b"\x10\x03abcd") 180assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x21"), **conf) 181assert not e._retry_pkt[EcuState(session=1)] 182 183assert True == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x50\x03\x00"), **conf) 184assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x11\x03abcd"), UDS(b"\x51\x03\x00"), **conf) 185conf = {"retry_if_none_received": True} 186assert True == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), None, **conf) 187assert e._retry_pkt[EcuState(session=1)] 188 189 190= ServiceEnumerator execute 191 192from queue import Queue 193from scapy.supersocket import SuperSocket 194 195class MockISOTPSocket(SuperSocket): 196 nonblocking_socket = True 197 @property 198 def closed(self): 199 return False 200 @closed.setter 201 def closed(self, var): 202 pass 203 def __init__(self, rcvd_queue=None): 204 self.rcvd_queue = Queue() 205 self.sent_queue = Queue() 206 if rcvd_queue is not None: 207 for c in rcvd_queue: 208 self.rcvd_queue.put(c) 209 def recv_raw(self, x=MTU): 210 pkt = bytes(self.rcvd_queue.get(True, 0.01)) 211 return UDS, pkt, 10.0 212 def send(self, x): 213 sx = raw(x) 214 try: 215 x.sent_time = 9.0 216 except AttributeError: 217 pass 218 self.sent_queue.put(sx) 219 return len(sx) 220 @staticmethod 221 def select(sockets, remain=None): 222 time.sleep(0) 223 return sockets 224 def sr(self, *args, **kargs): 225 from scapy import sendrecv 226 return sendrecv.sndrcv(self, *args, threaded=False, **kargs) 227 def sr1(self, *args, **kargs): 228 from scapy import sendrecv 229 ans = sendrecv.sndrcv(self, *args, threaded=False, **kargs)[0] # type: SndRcvList 230 if len(ans) > 0: 231 pkt = ans[0][1] # type: Packet 232 return pkt 233 else: 234 return None 235 236sock = MockISOTPSocket() 237sock.rcvd_queue.put(b"\x41") 238sock.rcvd_queue.put(b"\x42") 239sock.rcvd_queue.put(b"\x43") 240sock.rcvd_queue.put(b"\x44") 241sock.rcvd_queue.put(b"\x45") 242sock.rcvd_queue.put(b"\x46") 243sock.rcvd_queue.put(b"\x47") 244sock.rcvd_queue.put(b"\x48") 245sock.rcvd_queue.put(b"\x49") 246sock.rcvd_queue.put(b"\x4A") 247 248e = MyTestCase() 249 250e.execute(sock, EcuState(session=1)) 251 252assert len(e.filtered_results) == 10 253assert len(e.results_with_response) == 10 254assert len(e.results_without_response) == 0 255 256assert e.has_completed(EcuState(session=1)) 257assert e.completed 258 259e.execute(sock, EcuState(session=2), timeout=0.01) 260 261assert len(e.filtered_results) == 10 262assert len(e.results_with_response) == 10 263assert len(e.results_without_response) == 10 264 265assert e.has_completed(EcuState(session=2)) 266 267e.execute(sock, EcuState(session=3), timeout=0.01, exit_if_no_answer_received=True) 268 269assert not e.has_completed(EcuState(session=3)) 270assert not e.completed 271assert len(e.scanned_states) == 3 272 273e.execute(sock, EcuState(session=42), state_block_list=[EcuState(session=42)]) 274 275assert e.has_completed(EcuState(session=42)) 276assert len(e.scanned_states) == 3 277 278e.execute(sock, EcuState(session=13), state_block_list=EcuState(session=13)) 279 280assert e.has_completed(EcuState(session=13)) 281assert len(e.scanned_states) == 3 282 283e.execute(sock, EcuState(session=41), state_allow_list=[EcuState(session=42)]) 284 285assert e.has_completed(EcuState(session=41)) 286assert len(e.scanned_states) == 3 287 288e.execute(sock, EcuState(session=12), state_allow_list=EcuState(session=13)) 289 290assert e.has_completed(EcuState(session=12)) 291assert len(e.scanned_states) == 3 292 293= Test negative response code service not supported 294 295sock.rcvd_queue.put(b"\x7f\x01\x11") 296sock.rcvd_queue.put(b"\x7f\x01\x7f") 297 298e = MyTestCase() 299 300e.execute(sock, EcuState(session=1), exit_if_service_not_supported=True) 301 302assert not e._retry_pkt[EcuState(session=1)] 303assert len(e.results_with_response) == 1 304assert len(e.results_with_negative_response) == 1 305assert e.completed 306 307e.execute(sock, EcuState(session=2), exit_if_service_not_supported=True) 308 309assert not e._retry_pkt[EcuState(session=2)] 310assert len(e.results_with_response) == 2 311assert len(e.results_with_negative_response) == 2 312assert e.completed 313 314= Test negative response code retry if busy 315 316sock.rcvd_queue.put(b"\x7f\x01\x21") 317sock.rcvd_queue.put(b"\x7f\x01\x10") 318 319e = MyTestCase() 320 321e.execute(sock, EcuState(session=1)) 322 323assert e._retry_pkt[EcuState(session=1)] 324assert len(e.results_with_response) == 1 325assert len(e.results_with_negative_response) == 1 326assert len(e.results_without_response) == 0 327assert not e.completed 328 329e.execute(sock, EcuState(session=1)) 330 331assert not e._retry_pkt[EcuState(session=1)] 332assert len(e.results_with_response) == 2 333assert len(e.results_with_negative_response) == 2 334assert len(e.results_without_response) == 9 335assert e.completed 336assert e.has_completed(EcuState(session=1)) 337 338= Test negative response code don't retry if busy 339 340sock.rcvd_queue.put(b"\x7f\x01\x21") 341 342e = MyTestCase() 343 344e.execute(sock, EcuState(session=1), retry_if_busy_returncode=False) 345 346assert not e._retry_pkt[EcuState(session=1)] 347assert len(e.results_with_response) == 1 348assert len(e.results_with_negative_response) == 1 349assert len(e.results_without_response) == 9 350assert e.completed 351assert e.has_completed(EcuState(session=1)) 352 353= Test execution time 354 355sock.rcvd_queue.put(b"\x7f\x01\x10") 356 357e = MyTestCase() 358 359e.execute(sock, EcuState(session=1), execution_time=-1) 360 361assert not e._retry_pkt[EcuState(session=1)] 362assert len(e.results_with_response) == 1 363assert len(e.results_with_negative_response) == 1 364assert len(e.results_without_response) == 0 365assert not e.completed 366assert not e.has_completed(EcuState(session=1)) 367 368 369+ AutomotiveTestCaseExecutorConfiguration tests 370 371= Definitions 372 373class MockSock(object): 374 closed = False 375 def sr1(self, *args, **kwargs): 376 raise OSError 377 378class TestCase1(MyTestCase): 379 pass 380 381class TestCase2(MyTestCase): 382 pass 383 384class Scanner(AutomotiveTestCaseExecutor): 385 @property 386 def default_test_case_clss(self): 387 # type: () -> List[Type[AutomotiveTestCaseABC]] 388 return [MyTestCase] 389 390= Basic tests 391 392tce = Scanner(MockSock(), test_cases=[TestCase1, TestCase2, MyTestCase], 393 verbose=True, debug=True, 394 global_arg="Whatever", TestCase1_kwargs={"local_kwarg": 42}) 395 396config = tce.configuration # type: AutomotiveTestCaseExecutorConfiguration 397assert config.verbose 398assert config.debug 399assert len(config.test_cases) == 3 400assert len(config.stages) == 0 401assert len(config.staged_test_cases) == 0 402assert len(config.test_case_clss) == 3 403assert len(config.TestCase1.items()) == 5 404assert len(config.TestCase2.items()) == 4 405assert len(config["TestCase1"].items()) == 5 406assert len(config.MyTestCase.items()) == 4 407assert config.TestCase1["verbose"] 408assert config.TestCase1["debug"] 409assert config.TestCase1["local_kwarg"] == 42 410assert config.TestCase1["global_arg"] == "Whatever" 411assert config.TestCase2["global_arg"] == "Whatever" 412assert config.MyTestCase["global_arg"] == "Whatever" 413assert isinstance(tce.socket, SingleConversationSocket) 414 415 416= Basic tests with default values 417 418tce = Scanner(MockSock()) 419 420config = tce.configuration # type: AutomotiveTestCaseExecutorConfiguration 421assert not config.verbose 422assert not config.debug 423assert len(config.test_cases) == 1 424assert len(config.MyTestCase.items()) == 1 425assert isinstance(tce.socket, SingleConversationSocket) 426 427 428= Basic test with stages 429 430def connector(testcase1, _): 431 scan_range = len(testcase1.results) 432 return {"verbose": True, "scan_range": scan_range} 433 434tc1 = TestCase1() 435tc2 = TestCase2() 436 437pipeline = StagedAutomotiveTestCase([tc1, tc2], [None, connector]) 438 439tce = Scanner(MockSock(), test_cases=[pipeline]) 440 441config = tce.configuration # type: AutomotiveTestCaseExecutorConfiguration 442assert not config.verbose 443assert not config.debug 444assert len(config.test_cases) == 1 445assert len(config.stages) == 1 446assert len(config.staged_test_cases) == 2 447assert len(config.test_case_clss) == 3 448assert len(config.StagedAutomotiveTestCase.items()) == 1 449assert isinstance(tce.socket, SingleConversationSocket) 450 451= Basic tests with two stages 452 453def connector(testcase1, testcase2): 454 scan_range = len(testcase1.results) 455 return {"verbose": True, "scan_range": scan_range} 456 457tc1 = TestCase1() 458tc2 = TestCase2() 459 460pipeline = StagedAutomotiveTestCase([tc1, tc2], [None, connector]) 461 462class StagedTest(StagedAutomotiveTestCase): 463 pass 464 465pipeline2 = StagedTest([MyTestCase(), MyTestCase()]) 466 467tce = Scanner(MockSock(), test_cases=[pipeline, pipeline2], verbose=True) 468 469config = tce.configuration # type: AutomotiveTestCaseExecutorConfiguration 470assert config.verbose 471assert not config.debug 472assert len(config.test_cases) == 2 473assert len(config.stages) == 2 474assert len(config.staged_test_cases) == 4 475assert len(config.test_case_clss) == 5 476assert len(config.StagedAutomotiveTestCase.items()) == 2 477assert len(config.StagedTest.items()) == 2 478assert len(config.TestCase1.items()) == 2 479assert len(config.TestCase2.items()) == 2 480assert len(config.MyTestCase.items()) == 2 481 482assert isinstance(tce.socket, SingleConversationSocket) 483 484assert len(tce.state_paths) == 1 485assert len(tce.final_states) == 1 486 487tce.state_graph.add_edge((tce.final_states[0], EcuState(session=2))) 488 489assert len(tce.state_paths) == 2 490assert len(tce.final_states) == 2 491 492assert not tce.scan_completed 493 494 495= Reset Handler tests 496 497reset_flag = False 498 499def reset_func(): 500 global reset_flag 501 reset_flag = True 502 503tce = Scanner(MockSock(), reset_handler=reset_func) 504tce.target_state = EcuState(session=2) 505tce.reset_target() 506 507assert reset_flag 508assert tce.target_state == EcuState(session=1) 509 510= Reset Handler tests 2 511 512tce = Scanner(MockSock()) 513tce.target_state = EcuState(session=2) 514tce.reset_target() 515 516assert tce.target_state == EcuState(session=1) 517 518= Reconnect Handler tests 519 520class MockSocket2: 521 closed = False 522 523def reconnect_func(): 524 return MockSocket2() 525 526tce = Scanner(MockSock(), reconnect_handler=reconnect_func) 527 528print(tce.socket) 529print(repr(tce.socket)) 530assert isinstance(tce.socket._inner, MockSock) 531tce.reconnect() 532assert isinstance(tce.socket._inner, MockSocket2) 533 534= Reconnect Handler tests 2 535 536closed = False 537 538class MockSocket1: 539 closed = False 540 def close(self): 541 global closed 542 closed = True 543 544class MockSocket2: 545 closed = False 546 547def reconnect_func(): 548 return MockSocket2() 549 550tce = Scanner(MockSocket1(), reconnect_handler=reconnect_func) 551 552print(tce.socket) 553print(repr(tce.socket)) 554assert isinstance(tce.socket._inner, MockSocket1) 555tce.reconnect() 556assert isinstance(tce.socket._inner, MockSocket2) 557assert closed 558 559= TestCase execute 560 561pre_exec = False 562execute = False 563post_exec = False 564 565class TestCase42(MyTestCase): 566 def pre_execute(self, 567 socket, # type: _SocketUnion 568 state, # type: EcuState 569 global_configuration # type: AutomotiveTestCaseExecutorConfiguration # noqa: E501 570 ): # type: (...) -> None 571 global pre_exec 572 assert state == EcuState(session=1) 573 assert global_configuration.TestCase42["local_kwarg"] == 42 574 assert global_configuration.TestCase42["verbose"] 575 assert global_configuration.TestCase42["debug"] 576 global_configuration.TestCase42["local_kwarg"] = 1 577 pre_exec = True 578 def execute(self, socket, state, local_kwarg, verbose, debug, **kwargs): 579 global execute 580 assert verbose 581 assert debug 582 assert local_kwarg == 1 583 execute = True 584 def post_execute(self, 585 socket, # type: _SocketUnion 586 state, # type: EcuState 587 global_configuration # type: AutomotiveTestCaseExecutorConfiguration # noqa: E501 588 ): # type: (...) -> None 589 global post_exec 590 assert global_configuration.TestCase42["local_kwarg"] == 1 591 assert global_configuration.TestCase42["verbose"] 592 assert global_configuration.TestCase42["debug"] 593 post_exec = True 594 595 596tce = Scanner(MockSock(), test_cases=[TestCase42], 597 verbose=True, debug=True, 598 TestCase42_kwargs={"local_kwarg": 42}) 599 600tce.execute_test_case(TestCase42()) 601assert pre_exec == execute == post_exec == True 602 603 604= TestCase execute StateGenerator 605 606transition_done = False 607 608def transition_func(sock, conf, kwargs): 609 assert kwargs["arg42"] == "hello" 610 assert conf.TestCase43["local_kwarg"] == "world" 611 global transition_done 612 transition_done = True 613 return True 614 615class TestCase43(MyTestCase, StateGenerator): 616 def get_new_edge(self, socket, config): 617 assert config.TestCase43["local_kwarg"] == "world" 618 return EcuState(session=1), EcuState(session=2) 619 def get_transition_function(self, socket, edge): 620 assert edge[0] == EcuState(session=1) 621 assert edge[1] == EcuState(session=2) 622 return transition_func, {"arg42": "hello"}, None 623 def execute(self, socket, state, **kwargs): 624 return True 625 626 627tce = Scanner(MockSock(), test_cases=[TestCase43], 628 TestCase43_kwargs={"local_kwarg": "world"}) 629 630assert len(tce.final_states) == 1 631 632tce.execute_test_case(TestCase43()) 633 634assert len(tce.final_states) == 2 635assert EcuState(session=1) in tce.final_states and EcuState(session=2) in tce.final_states 636assert tce.enter_state(EcuState(session=1), EcuState(session=2)) 637assert transition_done 638 639 640= TestCase execute StateGenerator no edge 641 642class TestCase43(MyTestCase, StateGenerator): 643 def get_new_edge(self, socket, config): 644 assert config.TestCase43["local_kwarg"] == "world" 645 return None 646 def execute(self, socket, state, **kwargs): 647 return True 648 def get_transition_function(self, socket, edge): 649 raise NotImplementedError() 650 651tce = Scanner(MockSock(), test_cases=[TestCase43], 652 TestCase43_kwargs={"local_kwarg": "world"}) 653 654assert len(tce.final_states) == 1 655 656tce.execute_test_case(TestCase43()) 657 658assert len(tce.final_states) == 1 659assert EcuState(session=1) in tce.final_states 660assert not tce.enter_state(EcuState(session=1), EcuState(session=2)) 661 662 663= TestCase execute StateGenerator with cleanupfunc 664 665transition_done = False 666cleanup_done = False 667 668def transition_func(sock, conf, kwargs): 669 assert kwargs["arg42"] == "hello" 670 assert conf.TestCase43["local_kwarg"] == "world" 671 global transition_done 672 transition_done = True 673 return True 674 675def cleanup_func(sock, conf): 676 assert conf.TestCase43["local_kwarg"] == "world" 677 global cleanup_done 678 cleanup_done = True 679 return True 680 681class TestCase43(MyTestCase, StateGenerator): 682 def get_new_edge(self, socket, config): 683 assert config.TestCase43["local_kwarg"] == "world" 684 return EcuState(session=1), EcuState(session=2) 685 def get_transition_function(self, socket, edge): 686 assert edge[0] == EcuState(session=1) 687 assert edge[1] == EcuState(session=2) 688 return transition_func, {"arg42": "hello"}, cleanup_func 689 def execute(self, socket, state, **kwargs): 690 return True 691 692 693tce = Scanner(MockSock(), test_cases=[TestCase43], 694 TestCase43_kwargs={"local_kwarg": "world"}) 695 696assert len(tce.final_states) == 1 697 698tce.execute_test_case(TestCase43()) 699 700assert len(tce.final_states) == 2 701assert EcuState(session=1) in tce.final_states and EcuState(session=2) in tce.final_states 702assert not len(tce.cleanup_functions) 703assert tce.enter_state(EcuState(session=1), EcuState(session=2)) 704assert transition_done 705assert len(tce.cleanup_functions) 706tce.cleanup_state() 707assert not len(tce.cleanup_functions) 708assert cleanup_done 709 710 711= TestCase execute StateGenerator with not callable cleanupfunc 712 713transition_done = False 714 715def transition_func(sock, conf, kwargs): 716 assert kwargs["arg42"] == "hello" 717 assert conf.TestCase43["local_kwarg"] == "world" 718 global transition_done 719 transition_done = True 720 return True 721 722class TestCase43(MyTestCase, StateGenerator): 723 def get_new_edge(self, socket, config): 724 assert config.TestCase43["local_kwarg"] == "world" 725 return EcuState(session=1), EcuState(session=2) 726 def get_transition_function(self, socket, edge): 727 assert edge[0] == EcuState(session=1) 728 assert edge[1] == EcuState(session=2) 729 return transition_func, {"arg42": "hello"}, "fake" 730 def execute(self, socket, state, **kwargs): 731 return True 732 733 734tce = Scanner(MockSock(), test_cases=[TestCase43], 735 TestCase43_kwargs={"local_kwarg": "world"}) 736 737assert len(tce.final_states) == 1 738 739tce.execute_test_case(TestCase43()) 740 741assert len(tce.final_states) == 2 742assert EcuState(session=1) in tce.final_states and EcuState(session=2) in tce.final_states 743assert not len(tce.cleanup_functions) 744assert tce.enter_state(EcuState(session=1), EcuState(session=2)) 745assert transition_done 746assert len(tce.cleanup_functions) 747tce.cleanup_state() 748assert not len(tce.cleanup_functions) 749 750= TestCase execute StateGenerator with cleanupfunc negative return 751 752transition_done = False 753cleanup_done = False 754 755def transition_func(sock, conf, kwargs): 756 assert kwargs["arg42"] == "hello" 757 assert conf.TestCase43["local_kwarg"] == "world" 758 global transition_done 759 transition_done = True 760 return True 761 762def cleanup_func(sock, conf): 763 assert conf.TestCase43["local_kwarg"] == "world" 764 global cleanup_done 765 cleanup_done = True 766 return False 767 768class TestCase43(MyTestCase, StateGenerator): 769 def get_new_edge(self, socket, config): 770 assert config.TestCase43["local_kwarg"] == "world" 771 return EcuState(session=1), EcuState(session=2) 772 def get_transition_function(self, socket, edge): 773 assert edge[0] == EcuState(session=1) 774 assert edge[1] == EcuState(session=2) 775 return transition_func, {"arg42": "hello"}, cleanup_func 776 def execute(self, socket, state, **kwargs): 777 return True 778 779 780tce = Scanner(MockSock(), test_cases=[TestCase43], 781 TestCase43_kwargs={"local_kwarg": "world"}) 782 783assert len(tce.final_states) == 1 784 785tce.execute_test_case(TestCase43()) 786 787assert len(tce.final_states) == 2 788assert EcuState(session=1) in tce.final_states and EcuState(session=2) in tce.final_states 789assert not len(tce.cleanup_functions) 790assert tce.enter_state(EcuState(session=1), EcuState(session=2)) 791assert transition_done 792assert len(tce.cleanup_functions) 793tce.cleanup_state() 794assert not len(tce.cleanup_functions) 795assert cleanup_done 796 797 798= TestCase execute StateGenerator with cleanupfunc and path 799 800transition_done1 = False 801cleanup_done1 = False 802transition_done2 = False 803cleanup_done2 = False 804 805transition_error = False 806 807 808def transition_func1(sock, conf, kwargs): 809 global transition_done1 810 transition_done1 = True 811 return True 812 813def cleanup_func1(sock, conf): 814 global cleanup_done1 815 cleanup_done1 = True 816 return True 817 818def transition_func2(sock, conf, kwargs): 819 global transition_done2 820 transition_done2 = True 821 return not transition_error 822 823def cleanup_func2(sock, conf): 824 global cleanup_done2 825 cleanup_done2 = True 826 return True 827 828class TestCase43(MyTestCase, StateGenerator): 829 def get_new_edge(self, socket, config): 830 return EcuState(session=1), EcuState(session=2) 831 def get_transition_function(self, socket, edge): 832 return transition_func1, {"arg42": "hello"}, cleanup_func1 833 def execute(self, socket, state, **kwargs): 834 return True 835 836class TestCase44(MyTestCase, StateGenerator): 837 def get_new_edge(self, socket, config): 838 return EcuState(session=2), EcuState(session=3) 839 def get_transition_function(self, socket, edge): 840 return transition_func2, None, cleanup_func2 841 def execute(self, socket, state, **kwargs): 842 return True 843 844reset_done = False 845 846def reset_func(): 847 global reset_done 848 reset_done = True 849 850reconnect_done = False 851 852def reconnect_func(): 853 global reconnect_done 854 reconnect_done = True 855 return MockSock() 856 857tce = Scanner(MockSock(), test_cases=[TestCase43, TestCase44], 858 reset_handler=reset_func, reconnect_handler=reconnect_func) 859 860assert len(tce.final_states) == 1 861 862tce.execute_test_case(TestCase43()) 863 864assert len(tce.final_states) == 2 865assert EcuState(session=1) in tce.final_states and EcuState(session=2) in tce.final_states 866tce.execute_test_case(TestCase44()) 867assert len(tce.final_states) == 3 868assert EcuState(session=3) in tce.final_states and EcuState(session=2) in tce.final_states 869 870assert not len(tce.cleanup_functions) 871assert tce.enter_state_path([EcuState(session=1), EcuState(session=2), EcuState(session=3)]) 872assert transition_done1 873assert transition_done2 874assert len(tce.cleanup_functions) == 2 875assert reconnect_done 876assert reset_done 877tce.cleanup_state() 878assert cleanup_done1 879assert cleanup_done2 880 881try: 882 tce.enter_state_path([EcuState(session=3)]) 883 assert False 884except Scapy_Exception: 885 assert True 886 887= Test downrate edge 888 889transition_done1 = False 890cleanup_done1 = False 891 892tce = Scanner(MockSock(), test_cases=[TestCase43, TestCase44], 893 reset_handler=reset_func, reconnect_handler=reconnect_func) 894 895assert len(tce.final_states) == 1 896tce.execute_test_case(TestCase43()) 897assert len(tce.final_states) == 2 898assert EcuState(session=1) in tce.final_states and EcuState(session=2) in tce.final_states 899tce.execute_test_case(TestCase44()) 900assert len(tce.final_states) == 3 901assert EcuState(session=3) in tce.final_states and EcuState(session=2) in tce.final_states 902 903assert not len(tce.cleanup_functions) 904transition_error = True 905assert not tce.enter_state_path([EcuState(session=1), EcuState(session=2), EcuState(session=3)]) 906assert transition_done1 907assert cleanup_done1 908assert len(tce.cleanup_functions) == 0 909assert tce.state_graph.weights[(EcuState(session=1), EcuState(session=2))] == 1 910assert tce.state_graph.weights[(EcuState(session=2), EcuState(session=3))] == 2 911 912 913= TestCase execute TestCaseGenerator 914 915tc_executed = False 916 917class GeneratedTestCase(MyTestCase): 918 def execute(self, socket, state, **kwargs): 919 assert kwargs["local_kwarg"] == "world" 920 global tc_executed 921 tc_executed = True 922 return True 923 924 925class TestCase43(MyTestCase, TestCaseGenerator): 926 def execute(self, socket, state, **kwargs): 927 return True 928 def get_generated_test_case(self): 929 return GeneratedTestCase() 930 931 932tce = Scanner(MockSock(), test_cases=[TestCase43], 933 GeneratedTestCase_kwargs={"local_kwarg": "world"}) 934 935assert len(tce.final_states) == 1 936assert len(tce.configuration.test_cases) == 1 937 938tce.execute_test_case(tce.configuration.test_cases[0]) 939 940assert len(tce.configuration.test_cases) == 2 941 942tce.execute_test_case(tce.configuration.test_cases[1]) 943 944assert tc_executed 945 946= TestCase scan timeout 947 948tc_executed = False 949 950class GeneratedTestCase(MyTestCase): 951 def execute(self, socket, state, **kwargs): 952 assert kwargs["local_kwarg"] == "world" 953 global tc_executed 954 tc_executed = True 955 return True 956 957 958tce = Scanner(MockSock(), test_cases=[GeneratedTestCase], 959 GeneratedTestCase_kwargs={"local_kwarg": "world"}) 960 961assert len(tce.final_states) == 1 962assert len(tce.configuration.test_cases) == 1 963 964tce.scan(-1) 965 966assert not tc_executed 967 968 969= TestCase scan 970 971tc_executed = False 972 973class GeneratedTestCase(MyTestCase): 974 def execute(self, socket, state, **kwargs): 975 assert kwargs["local_kwarg"] == "world" 976 global tc_executed 977 tc_executed = True 978 self._state_completed[state] = True 979 return True 980 981 982class TestCase43(MyTestCase, TestCaseGenerator): 983 def execute(self, socket, state, **kwargs): 984 self._state_completed[state] = True 985 return True 986 def get_generated_test_case(self): 987 return GeneratedTestCase() 988 989 990tce = Scanner(MockSock(), test_cases=[TestCase43], 991 GeneratedTestCase_kwargs={"local_kwarg": "world"}) 992 993assert len(tce.final_states) == 1 994assert len(tce.configuration.test_cases) == 1 995 996tce.scan() 997 998assert len(tce.configuration.test_cases) == 2 999assert tc_executed 1000assert tce.scan_completed 1001 1002= Test supported responses 1003 1004class MyTestCase1(AutomotiveTestCase): 1005 _description = "MyTestCase1" 1006 _supported_kwargs = copy.copy(AutomotiveTestCase._supported_kwargs) 1007 _supported_kwargs.update({ 1008 'stop_event': (threading.Event, None), # type: ignore 1009 }) 1010 @property 1011 def supported_responses(self): 1012 return [EcuResponse(EcuState(session=2), responses=UDS() / UDS_RDBIPR(dataIdentifier=2) / Raw(b"de")), 1013 EcuResponse([EcuState(session=2), EcuState(security_level=6)], responses=UDS() / UDS_RDBIPR(dataIdentifier=3) / Raw(b"dea2")), 1014 EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x13))] 1015 1016 1017class MyTestCase2(AutomotiveTestCase): 1018 _description = "MyTestCase2" 1019 _supported_kwargs = copy.copy(AutomotiveTestCase._supported_kwargs) 1020 _supported_kwargs.update({ 1021 'stop_event': (threading.Event, None), # type: ignore 1022 }) 1023 @property 1024 def supported_responses(self): 1025 return [EcuResponse(EcuState(session=2), responses=UDS() / UDS_RDBIPR(dataIdentifier=5) / Raw(b"deadbeef1")), 1026 EcuResponse(EcuState(session=2), responses=UDS() / UDS_RDBIPR(dataIdentifier=6) / Raw(b"deadbeef2")), 1027 EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x10, requestServiceId=0x11))] 1028 1029 1030tce = Scanner(MockSock(), test_cases=[MyTestCase1(), MyTestCase2()]) 1031 1032resps = tce.supported_responses 1033 1034assert len(resps) == 6 1035assert resps[0].responses[0].service != 0x7f 1036assert resps[1].responses[0].service != 0x7f 1037assert resps[2].responses[0].service != 0x7f 1038assert resps[3].responses[0].service != 0x7f 1039 1040assert resps[4].responses[0].service == 0x7f 1041assert resps[5].responses[0].service == 0x7f 1042 1043assert resps[0].responses[0].load == b"dea2" 1044assert resps[1].responses[0].load == b"deadbeef1" 1045assert resps[2].responses[0].load == b"deadbeef2" 1046assert resps[3].responses[0].load == b"de" 1047assert resps[4].responses[0].requestServiceId == 0x13 1048assert resps[5].responses[0].requestServiceId == 0x11 1049 1050= Test show testcases 1051 1052try: 1053 tce.show_testcases() 1054 assert True 1055except Exception: 1056 assert False 1057 1058 1059try: 1060 tce.show_testcases_status() 1061 assert True 1062except Exception: 1063 assert False 1064 1065= Test StateGeneratingServiceEnumerator 1066 1067class TestCase43(MyTestCase, StateGeneratingServiceEnumerator): 1068 def execute(self, socket, state, **kwargs): 1069 return True 1070 @property 1071 def results(self): # type: () -> List[_AutomotiveTestCaseScanResult] 1072 return [_AutomotiveTestCaseScanResult(EcuState(session=1), UDS()/UDS_DSC(b"\x03"), UDS()/UDS_DSCPR(b"\x03"), 1.1, 1.2)] 1073 1074tce = Scanner(MockSock(), test_cases=[TestCase43]) 1075 1076assert len(tce.final_states) == 1 1077 1078tce.execute_test_case(TestCase43()) 1079 1080assert len(tce.final_states) == 2 1081assert EcuState(session=1) in tce.final_states and EcuState(session=3) in tce.final_states 1082 1083tf, args, cf = tce.state_graph.get_transition_tuple_for_edge((EcuState(session=1), EcuState(session=3))) 1084 1085assert cf is None 1086assert tf is not None 1087assert len(args) == 2 1088assert args["req"] == UDS()/UDS_DSC(b"\x03") 1089assert "diagnosticSessionType" in args["desc"] and "extendedDiagnosticSession" in args["desc"] 1090 1091assert not tce.enter_state(EcuState(session=1), EcuState(session=3)) 1092