1% Regression tests for Simulated ECUs and UDS Scanners 2~ scanner 3 4+ Configuration 5~ conf 6 7= Imports 8import io 9import pickle 10from scapy.contrib.isotp import ISOTPMessageBuilder 11from test.testsocket import TestSocket, cleanup_testsockets, UnstableSocket 12from scapy.automaton import ObjectPipe 13 14############ 15############ 16+ Load general modules 17 18= Load contribution layer 19 20 21from scapy.contrib.automotive.uds import * 22from scapy.contrib.automotive.uds_ecu_states import * 23from scapy.contrib.automotive.uds_scan import * 24from scapy.contrib.automotive.ecu import * 25 26load_layer("can") 27 28conf.debug_dissector = False 29 30 31= Define Testfunction 32 33def executeScannerInVirtualEnvironment(supported_responses, enumerators, unstable_socket=True, **kwargs): 34 tester_obj_pipe = ObjectPipe(name="TesterPipe") 35 ecu_obj_pipe = ObjectPipe(name="ECUPipe") 36 TesterSocket = UnstableSocket if unstable_socket else TestSocket 37 tester = TesterSocket(UDS, tester_obj_pipe) 38 ecu = TestSocket(UDS, ecu_obj_pipe) 39 tester.pair(ecu) 40 answering_machine = EcuAnsweringMachine( 41 supported_responses=supported_responses, main_socket=ecu, 42 basecls=UDS, verbose=False) 43 def reset(): 44 answering_machine.state.reset() 45 answering_machine.state["session"] = 1 46 sniff(timeout=0.001, opened_socket=[ecu, tester]) 47 def reconnect(): 48 try: 49 tester.close() 50 except Exception: 51 pass 52 tester = TesterSocket(UDS, tester_obj_pipe) 53 ecu.pair(tester) 54 return tester 55 def answering_machine_thread(): 56 answering_machine( 57 timeout=120, stop_filter=lambda x: bytes(x) == b"\xff\xff\xff") 58 sim = threading.Thread(target=answering_machine_thread) 59 try: 60 sim.start() 61 scanner = UDS_Scanner( 62 tester, reset_handler=reset, reconnect_handler=reconnect, 63 test_cases=enumerators, timeout=0.1, 64 retry_if_none_received=True, unittest=True, 65 **kwargs) 66 for i in range(12): 67 print("Starting scan") 68 scanner.scan(timeout=10) 69 if scanner.scan_completed: 70 print("Scan completed after %d iterations" % i) 71 break 72 finally: 73 ecu.ins.send(Raw(b"\xff\xff\xff")) 74 sim.join(timeout=2) 75 assert not sim.is_alive() 76 cleanup_testsockets() 77 tester_obj_pipe.close() 78 ecu_obj_pipe.close() 79 if LINUX: 80 pickle_test(scanner) 81 return scanner 82 83def pickle_test(scanner): 84 f = io.BytesIO() 85 pickle.dump(scanner, f) 86 unp = pickle.loads(f.getvalue()) 87 assert scanner.scan_completed == unp.scan_completed 88 assert scanner.state_paths == unp.state_paths 89 90= Load packets from pcap 91 92conf.contribs['CAN']['swap-bytes'] = True 93pkts = rdpcap(scapy_path("test/pcaps/candump_uds_scanner.pcap.gz")) 94assert len(pkts) 95 96= Create UDS messages from packets 97 98builder = ISOTPMessageBuilder(basecls=UDS, use_ext_address=False, rx_id=[0x641, 0x651]) 99msgs = list() 100 101for p in pkts: 102 if p.data == b"ECURESET": 103 msgs.append(p) 104 else: 105 builder.feed(p) 106 if len(builder): 107 msgs.append(builder.pop()) 108 109assert len(msgs) 110 111= Create ECU-Clone from packets 112 113mEcu = Ecu(logging=False, verbose=False, store_supported_responses=True, lookahead=3) 114 115for p in msgs: 116 if isinstance(p, CAN) and p.data == b"ECURESET": 117 mEcu.reset() 118 else: 119 mEcu.update(p) 120 121assert len(mEcu.supported_responses) 122 123= Test UDS_SAEnumerator evaluate_response 124 125e = UDS_SAEnumerator() 126 127config = {} 128 129s = EcuState(session=1) 130 131assert False == e._evaluate_response(s, UDS(b"\x27\x01"), None, **config) 132config = {"exit_if_service_not_supported": True} 133assert not e._retry_pkt[s] 134assert True == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x7f\x27\x11"), **config) 135assert not e._retry_pkt[s] 136assert True == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x7f\x27\x24"), **config) 137assert e._retry_pkt[s] == UDS(b"\x27\x01") 138assert False == e._evaluate_response(s, UDS(b"\x27\x02"), UDS(b"\x7f\x27\x24"), **config) 139assert not e._retry_pkt[s] 140assert True == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x7f\x27\x37"), **config) 141assert e._retry_pkt[s] == UDS(b"\x27\x01") 142assert False == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x7f\x27\x37"), **config) 143assert not e._retry_pkt[s] 144assert True == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x67\x01ab"), **config) 145assert not e._retry_pkt[s] 146assert False == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x67\x02ab"), **config) 147assert not e._retry_pkt[s] 148 149 150= Test UDS_SA_XOR_Enumerator stand alone mode 151 152TesterSocket = TestSocket 153ecu_sock = TestSocket(UDS) 154mTester = TesterSocket(UDS) 155ecu_sock.pair(mTester) 156answering_machine = EcuAnsweringMachine(supported_responses=mEcu.supported_responses, main_socket=ecu_sock, basecls=UDS, verbose=False) 157sim = threading.Thread(target=answering_machine, kwargs={'timeout': 1000, "stop_filter": lambda x: bytes(x) == b"\xff\xff\xff"}) 158sim.start() 159try: 160 resp = mTester.sr1(UDS()/UDS_TP(b"\x00"), verbose=False, timeout=1) 161 print(repr(resp)) 162 assert resp and resp.service != 0x7f 163 resp = mTester.sr1(UDS()/UDS_DSC(diagnosticSessionType=3), verbose=False, timeout=1) 164 print(repr(resp)) 165 assert resp and resp.service != 0x7f 166 assert UDS_SA_XOR_Enumerator().get_security_access(mTester, 1) 167finally: 168 mTester.send(Raw(b"\xff\xff\xff")) 169 sim.join(timeout=2) 170 cleanup_testsockets() 171 172 173= Test configuration validation 174 175try: 176 scanner = UDS_Scanner(TestSocket(UDS), 177 test_cases=[UDS_SA_XOR_Enumerator, UDS_DSCEnumerator, UDS_ServiceEnumerator], 178 UDS_DSCEnumerator_kwargs={"scan_range": range(0x1000), "delay_state_change": 0, 179 "overwrite_timeout": False}) 180 assert False 181except Scapy_Exception: 182 pass 183 184= Simulate ECU and run Scanner 185 186scanner = executeScannerInVirtualEnvironment( 187 mEcu.supported_responses, 188 [UDS_SA_XOR_Enumerator, UDS_DSCEnumerator, UDS_ServiceEnumerator], 189 UDS_DSCEnumerator_kwargs={"scan_range": range(5), "delay_state_change": 0, 190 "overwrite_timeout": False}, 191 UDS_SA_XOR_Enumerator_kwargs={"scan_range": range(5)}, 192 UDS_ServiceEnumerator_kwargs={"scan_range": [0x10, 0x11, 0x14, 0x19, 0x22, 193 0x23, 0x24, 0x27, 0x28, 0x29, 194 0x2A, 0x2C, 0x2E, 0x2F, 0x31, 195 0x34, 0x35, 0x36, 0x37, 0x38, 196 0x3D, 0x3E, 0x83, 0x84, 0x85, 197 0x87], 198 "request_length": 1}) 199 200scanner.show_testcases() 201scanner.show_testcases_status() 202assert len(scanner.state_paths) == 5 203assert scanner.scan_completed 204assert scanner.progress() > 0.95 205 206assert EcuState(session=1) in scanner.final_states 207assert EcuState(session=2, tp=1) in scanner.final_states 208assert EcuState(session=3, tp=1) in scanner.final_states 209assert EcuState(session=2, tp=1, security_level=2) in scanner.final_states 210assert EcuState(session=3, tp=1, security_level=2) in scanner.final_states 211 212#################### UDS_SA_XOR_Enumerator ################ 213tc = scanner.configuration.test_cases[0] 214 215assert len(tc.results_without_response) < 10 216if tc.results_without_response: 217 tc.show() 218 219assert len(tc.results_with_negative_response) == 19 220assert len(tc.results_with_positive_response) >= 6 221assert len(tc.scanned_states) == 5 222 223result = tc.show(dump=True) 224 225assert "serviceNotSupportedInActiveSession received 5 times" in result 226assert "incorrectMessageLengthOrInvalidFormat received 14 times" in result 227 228################# UDS_DSCEnumerator ##################### 229tc = scanner.configuration.test_cases[1] 230 231assert len(tc.results_without_response) < 10 232if tc.results_without_response: 233 tc.show() 234 235assert len(tc.results_with_negative_response) == 20 236assert len(tc.results_with_positive_response) == 5 237assert len(tc.scanned_states) == 5 238 239result = tc.show(dump=True) 240 241assert "incorrectMessageLengthOrInvalidFormat received 20 times" in result 242 243###################### UDS_ServiceEnumerator ################### 244tc = scanner.configuration.test_cases[2] 245 246assert len(tc.results_without_response) < 10 247if tc.results_without_response: 248 tc.show() 249 250assert len(tc.results_with_negative_response) == 130 251assert len(tc.results_with_positive_response) == 0 252assert len(tc.scanned_states) == 5 253 254result = tc.show(dump=True) 255 256assert "incorrectMessageLengthOrInvalidFormat received 34 times" in result 257assert "serviceNotSupported received 75 times" in result 258assert "serviceNotSupportedInActiveSession received 19 times" in result 259assert "securityAccessDenied received 2 times" in result 260 261= UDS_ServiceEnumerator 262 263def req_handler(resp, req): 264 if req.service != 0x22: 265 return False 266 if len(req) == 1: 267 resp.negativeResponseCode="generalReject" 268 return True 269 if len(req) == 2: 270 resp.negativeResponseCode="incorrectMessageLengthOrInvalidFormat" 271 return True 272 if len(req) == 3: 273 resp.negativeResponseCode="requestOutOfRange" 274 return True 275 return False 276 277resps = [EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")], req_handler)] 278 279es = [UDS_ServiceEnumerator] 280 281debug_dissector_backup = conf.debug_dissector 282 283# This Enumerator is sending corrupted Packets, therefore we need to disable the debug_dissector 284conf.debug_dissector = False 285scanner = executeScannerInVirtualEnvironment( 286 resps, es, UDS_ServiceEnumerator_kwargs={"request_length": 3}, unstable_socket=False) 287conf.debug_dissector = debug_dissector_backup 288 289assert scanner.scan_completed 290assert scanner.progress() > 0.95 291tc = scanner.configuration.test_cases[0] 292 293assert len(tc.results_without_response) < 10 294if tc.results_without_response: 295 tc.show() 296 297tc.show() 298 299assert len(tc.results_with_negative_response) == 128 * 3 300assert len(tc.results_with_positive_response) == 0 301assert len(tc.scanned_states) == 1 302 303result = tc.show(dump=True) 304 305assert "incorrectMessageLengthOrInvalidFormat" in result 306assert "requestOutOfRange" in result 307 308= UDS_RDBIEnumerator 309 310resps = [EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=1)/Raw(b"asdfbeef1")]), 311 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=2)/Raw(b"beef2")]), 312 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=3)/Raw(b"beef3")]), 313 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0xff)/Raw(b"beefff")]), 314 EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")])] 315 316es = [UDS_RDBIEnumerator] 317 318scanner = executeScannerInVirtualEnvironment( 319 resps, es, UDS_RDBIEnumerator_kwargs={"scan_range": range(0x100)}) 320 321assert scanner.scan_completed 322assert scanner.progress() > 0.95 323tc = scanner.configuration.test_cases[0] 324 325assert len(tc.results_without_response) < 10 326if tc.results_without_response: 327 tc.show() 328 329assert len(tc.results_with_negative_response) == 256 - 4 330assert len(tc.results_with_positive_response) == 4 331assert len(tc.scanned_states) == 1 332 333result = tc.show(dump=True) 334 335assert "asdfbeef1" in result 336assert "beef2" in result 337assert "beef3" in result 338assert "beefff" in result 339assert "subFunctionNotSupported received" in result 340 341ids = [t.req.identifiers[0] for t in tc.results_with_positive_response] 342 343assert 1 in ids 344assert 2 in ids 345assert 3 in ids 346assert 0xff in ids 347 348= UDS_RDBISelectiveEnumerator 349~ not_pypy 350 351resps = [EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x101)/Raw(b"asdfbeef1")]), 352 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x102)/Raw(b"beef2")]), 353 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x103)/Raw(b"beef3")]), 354 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x104)/Raw(b"beef3")]), 355 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x105)/Raw(b"beef3")]), 356 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x106)/Raw(b"beef3")]), 357 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x107)/Raw(b"beef3")]), 358 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x108)/Raw(b"beef3")]), 359 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x109)/Raw(b"beef3")]), 360 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x110)/Raw(b"beef3")]), 361 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x111)/Raw(b"beef3")]), 362 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x112)/Raw(b"beef3")]), 363 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x113)/Raw(b"beef3")]), 364 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x114)/Raw(b"beef3")]), 365 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x115)/Raw(b"beef3")]), 366 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x116)/Raw(b"beef3")]), 367 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x117)/Raw(b"beef3")]), 368 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x118)/Raw(b"beef3")]), 369 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x119)/Raw(b"beef3")]), 370 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x120)/Raw(b"beef3")]), 371 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x121)/Raw(b"beef3")]), 372 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x122)/Raw(b"beef3")]), 373 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x123)/Raw(b"beef3")]), 374 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x124)/Raw(b"beef3")]), 375 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x125)/Raw(b"beef3")]), 376 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x126)/Raw(b"beef3")]), 377 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x127)/Raw(b"beef3")]), 378 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x128)/Raw(b"beef3")]), 379 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x129)/Raw(b"beef3")]), 380 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x130)/Raw(b"beef3")]), 381 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x131)/Raw(b"beef3")]), 382 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x132)/Raw(b"beef3")]), 383 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x133)/Raw(b"beef3")]), 384 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x134)/Raw(b"beef3")]), 385 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x135)/Raw(b"beef35")]), 386 EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")])] 387 388es = [UDS_RDBISelectiveEnumerator] 389 390scanner = executeScannerInVirtualEnvironment(resps, es, UDS_RDBIRandomEnumerator_kwargs={"probe_start": 0, "probe_end": 0x500}) 391 392assert scanner.scan_completed 393assert scanner.progress() > 0.95 394 395tc = scanner.configuration.stages[0][0] 396 397tc.show() 398 399assert len(tc.results_without_response) < 10 400if tc.results_without_response: 401 tc.show() 402 403assert len(tc.results_with_negative_response) > 100 404assert len(tc.results_with_positive_response) >= 1 405assert len(tc.scanned_states) == 1 406 407assert scanner.scan_completed 408assert scanner.progress() > 0.95 409 410tc = scanner.configuration.stages[0][1] 411 412tc.show() 413 414assert len(tc.results_without_response) < 10 415if tc.results_without_response: 416 tc.show() 417 418assert len(tc.results_with_negative_response) == 29 419assert len(tc.results_with_positive_response) == 35 420assert len(tc.scanned_states) == 1 421 422result = tc.show(dump=True) 423 424assert "asdfbeef1" in result 425assert "beef2" in result 426assert "beef3" in result 427assert "beef35" in result 428assert "subFunctionNotSupported received" in result 429 430ids = [t.req.identifiers[0] for t in tc.results_with_positive_response] 431 432assert 0x101 in ids 433assert 0x102 in ids 434assert 0x103 in ids 435assert 0x135 in ids 436 437= UDS_WDBIEnumerator 438 439def wdbi_handler(resp, req): 440 if req.service != 0x2E: 441 return False 442 assert req.dataIdentifier in [1, 2, 3, 0xff] 443 resp.dataIdentifier = req.dataIdentifier 444 if req.dataIdentifier == 1: 445 assert req.load == b'asdfbeef1' 446 return True 447 if req.dataIdentifier == 2: 448 assert req.load == b'beef2' 449 return True 450 if req.dataIdentifier == 3: 451 assert req.load == b"beef3" 452 return True 453 if req.dataIdentifier == 0xff: 454 assert req.load == b"beefff" 455 return True 456 return False 457 458resps = [EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=1)/Raw(b"asdfbeef1")]), 459 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=2)/Raw(b"beef2")]), 460 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=3)/Raw(b"beef3")]), 461 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0xff)/Raw(b"beefff")]), 462 EcuResponse(None, [UDS()/UDS_WDBIPR()], answers=wdbi_handler), 463 EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")])] 464 465es = [UDS_WDBISelectiveEnumerator()] 466 467scanner = executeScannerInVirtualEnvironment( 468 resps, es, UDS_RDBIEnumerator_kwargs={"scan_range": range(0x100)}) 469 470assert scanner.scan_completed 471assert scanner.progress() > 0.95 472 473tc = scanner.configuration.stages[0][0] 474 475assert len(tc.results_without_response) < 10 476if tc.results_without_response: 477 tc.show() 478 479assert len(tc.results_with_negative_response) == 256 - 4 480assert len(tc.results_with_positive_response) == 4 481assert len(tc.scanned_states) == 1 482 483result = tc.show(dump=True) 484 485assert "asdfbeef1" in result 486assert "beef2" in result 487assert "beef3" in result 488assert "beefff" in result 489assert "subFunctionNotSupported received" in result 490 491ids = [t.req.identifiers[0] for t in tc.results_with_positive_response] 492 493assert 1 in ids 494assert 2 in ids 495assert 3 in ids 496assert 0xff in ids 497 498######################### WDBI ############################# 499tc = scanner.configuration.stages[0][1] 500 501assert len(tc.results_without_response) < 10 502if tc.results_without_response: 503 tc.show() 504 505assert len(tc.results_with_negative_response) == 0 506assert len(tc.results_with_positive_response) == 4 507assert len(tc.scanned_states) == 1 508 509result = tc.show(dump=True) 510 511ids = [t.req.dataIdentifier for t in tc.results_with_positive_response] 512 513assert 1 in ids 514assert 2 in ids 515assert 3 in ids 516assert 0xff in ids 517 518= UDS_WDBIEnumerator 2 519 520def wdbi_handler(resp, req): 521 if req.service != 0x2E: 522 return False 523 assert req.dataIdentifier in [1, 2, 3, 0xff] 524 resp.dataIdentifier = req.dataIdentifier 525 if req.dataIdentifier == 1: 526 assert req.load == b'asdfbeef1' 527 return True 528 if req.dataIdentifier == 2: 529 assert req.load == b'beef2' 530 return True 531 if req.dataIdentifier == 3: 532 assert req.load == b"beef3" 533 return True 534 if req.dataIdentifier == 0xff: 535 assert req.load == b"beefff" 536 return True 537 return False 538 539resps = [EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=1)/Raw(b"asdfbeef1")]), 540 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=2)/Raw(b"beef2")]), 541 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=3)/Raw(b"beef3")]), 542 EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0xff)/Raw(b"beefff")]), 543 EcuResponse(None, [UDS()/UDS_WDBIPR()], answers=wdbi_handler), 544 EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")])] 545 546es = [UDS_WDBISelectiveEnumerator] 547 548scanner = executeScannerInVirtualEnvironment( 549 resps, es, UDS_RDBIEnumerator_kwargs={"scan_range": range(0x100)}) 550 551assert scanner.scan_completed 552assert scanner.progress() > 0.95 553 554tc = scanner.configuration.test_cases[0][0] 555 556assert len(tc.results_without_response) < 10 557if tc.results_without_response: 558 tc.show() 559 560assert len(tc.results_with_negative_response) == 256 - 4 561assert len(tc.results_with_positive_response) == 4 562assert len(tc.scanned_states) == 1 563 564result = tc.show(dump=True) 565 566assert "asdfbeef1" in result 567assert "beef2" in result 568assert "beef3" in result 569assert "beefff" in result 570assert "subFunctionNotSupported received" in result 571 572ids = [t.req.identifiers[0] for t in tc.results_with_positive_response] 573 574assert 1 in ids 575assert 2 in ids 576assert 3 in ids 577assert 0xff in ids 578 579######################### WDBI ############################# 580tc = scanner.configuration.test_cases[0][1] 581 582assert len(tc.results_without_response) < 10 583if tc.results_without_response: 584 tc.show() 585 586assert len(tc.results_with_negative_response) == 0 587assert len(tc.results_with_positive_response) == 4 588assert len(tc.scanned_states) == 1 589 590result = tc.show(dump=True) 591 592ids = [t.req.dataIdentifier for t in tc.results_with_positive_response] 593 594assert 1 in ids 595assert 2 in ids 596assert 3 in ids 597assert 0xff in ids 598 599 600= UDS_TPEnumerator 601 602resps = [EcuResponse(None, [UDS()/UDS_TPPR()]), 603 EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="serviceNotSupported", requestServiceId="TesterPresent")])] 604 605es = [UDS_TPEnumerator] 606 607scanner = executeScannerInVirtualEnvironment(resps, es) 608 609assert scanner.scan_completed 610assert scanner.progress() > 0.95 611 612tc = scanner.configuration.test_cases[0] 613 614assert len(tc.results_without_response) < 10 615if tc.results_without_response: 616 tc.show() 617 618assert len(tc.results_with_negative_response) == 0 619assert len(tc.results_with_positive_response) == 2 620assert len(tc.scanned_states) == 2 621 622assert tc.show(dump=True) 623 624= UDS_EREnumerator 625 626resps = [EcuResponse(None, [UDS()/UDS_ERPR(resetType=1)]), 627 EcuResponse(None, [UDS()/UDS_ERPR(resetType=3)]), 628 EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ECUReset")])] 629 630es = [UDS_EREnumerator] 631 632scanner = executeScannerInVirtualEnvironment(resps, es) 633 634assert scanner.scan_completed 635assert scanner.progress() > 0.95 636 637tc = scanner.configuration.test_cases[0] 638 639assert len(tc.results_without_response) < 10 640if tc.results_without_response: 641 tc.show() 642 643assert len(tc.results_with_negative_response) == 256 - 2 644assert len(tc.results_with_positive_response) == 2 645assert len(tc.scanned_states) == 1 646 647result = tc.show(dump=True) 648 649assert "hardReset" in result 650assert "softReset" in result 651assert "subFunctionNotSupported received" in result 652 653ids = [t.req.resetType for t in tc.results_with_positive_response] 654 655assert 1 in ids 656assert 3 in ids 657 658 659= UDS_CCEnumerator 660 661resps = [EcuResponse(None, [UDS()/UDS_CCPR(controlType=1)]), 662 EcuResponse(None, [UDS()/UDS_CCPR(controlType=3)]), 663 EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="CommunicationControl")])] 664 665es = [UDS_CCEnumerator] 666 667scanner = executeScannerInVirtualEnvironment(resps, es, inter=0.001) 668 669assert scanner.scan_completed 670assert scanner.progress() > 0.95 671 672tc = scanner.configuration.test_cases[0] 673 674assert len(tc.results_without_response) < 10 675if tc.results_without_response: 676 tc.show() 677 678assert len(tc.results_with_negative_response) == 256 - 2 679assert len(tc.results_with_positive_response) == 2 680assert len(tc.scanned_states) == 1 681 682result = tc.show(dump=True) 683 684assert "enableRxAndDisableTx" in result 685assert "disableRxAndTx" in result 686assert "subFunctionNotSupported received" in result 687 688ids = [t.req.controlType for t in tc.results_with_positive_response] 689 690assert 1 in ids 691assert 3 in ids 692 693= UDS_RDBPIEnumerator 694 695UDS_RDBPI.periodicDataIdentifiers[1] = "identifierElectric" 696UDS_RDBPI.periodicDataIdentifiers[3] = "identifierGas" 697 698resps = [EcuResponse(None, [UDS()/UDS_RDBPIPR(periodicDataIdentifier=1, dataRecord=b'electric')]), 699 EcuResponse(None, [UDS()/UDS_RDBPIPR(periodicDataIdentifier=3, dataRecord=b'gas')]), 700 EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataPeriodicIdentifier")])] 701 702es = [UDS_RDBPIEnumerator] 703 704scanner = executeScannerInVirtualEnvironment(resps, es) 705 706assert scanner.scan_completed 707assert scanner.progress() > 0.95 708 709tc = scanner.configuration.test_cases[0] 710 711assert len(tc.results_without_response) < 10 712if tc.results_without_response: 713 tc.show() 714 715assert len(tc.results_with_negative_response) == 256 - 2 716assert len(tc.results_with_positive_response) == 2 717assert len(tc.scanned_states) == 1 718 719result = tc.show(dump=True) 720 721assert "electric" in result 722assert "gas" in result 723assert "0x01 identifierElectric" in result 724assert "0x03 identifierGas" in result 725assert "subFunctionNotSupported received" in result 726 727ids = [t.req.periodicDataIdentifier for t in tc.results_with_positive_response] 728 729assert 1 in ids 730assert 3 in ids 731 732 733= UDS_RCEnumerator 734 735resps = [EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=1, routineIdentifier=1)/Raw(b"asdfbeef1")]), 736 EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=2, routineIdentifier=2)/Raw(b"beef2")]), 737 EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=3, routineIdentifier=3)/Raw(b"beef3")]), 738 EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=1, routineIdentifier=0x10)/Raw(b"beefff")]), 739 EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="RoutineControl")])] 740 741es = [UDS_RCEnumerator] 742 743scanner = executeScannerInVirtualEnvironment(resps, es, UDS_RCEnumerator_kwargs={"scan_range": range(0x11)}) 744 745assert scanner.scan_completed 746assert scanner.progress() > 0.95 747 748tc = scanner.configuration.test_cases[0] 749 750assert len(tc.results_without_response) < 10 751if tc.results_without_response: 752 tc.show() 753 754assert len(tc.results_with_negative_response) == 0x11 * 3 - 4 755assert len(tc.results_with_positive_response) == 4 756assert len(tc.scanned_states) == 1 757 758result = tc.show(dump=True) 759 760assert "subFunctionNotSupported received" in result 761 762ids = [t.req.routineIdentifier for t in tc.results_with_positive_response] 763 764assert 1 in ids 765assert 2 in ids 766assert 3 in ids 767assert 0x10 in ids 768 769 770= UDS_RCSelectiveEnumerator 771 772resps = [EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=1, routineIdentifier=1)/Raw(b"asdfbeef1")]), 773 EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=2, routineIdentifier=1)/Raw(b"beef2")]), 774 EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=3, routineIdentifier=1)/Raw(b"beef3")]), 775 EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=1, routineIdentifier=0x10)/Raw(b"beefff")]), 776 EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="RoutineControl")])] 777 778es = [UDS_RCSelectiveEnumerator] 779 780scanner = executeScannerInVirtualEnvironment(resps, es, UDS_RCStartEnumerator_kwargs={"scan_range": range(0x11)}) 781 782assert scanner.scan_completed 783assert scanner.progress() > 0.95 784 785tc = scanner.configuration.stages[0][0] 786 787assert len(tc.results_without_response) < 10 788if tc.results_without_response: 789 tc.show() 790 791assert len(tc.results_with_negative_response) == 0x11 - 2 792assert len(tc.results_with_positive_response) == 2 793assert len(tc.scanned_states) == 1 794 795result = tc.show(dump=True) 796 797assert "subFunctionNotSupported received" in result 798 799ids = [t.req.routineIdentifier for t in tc.results_with_positive_response] 800 801assert 1 in ids 802assert 0x10 in ids 803 804tc = scanner.configuration.stages[0][1] 805 806assert len(tc.results_without_response) < 10 807if tc.results_without_response: 808 tc.show() 809 810assert len(tc.results_with_negative_response) == 538 811assert len(tc.results_with_positive_response) == 2 812assert len(tc.scanned_states) == 1 813 814result = tc.show(dump=True) 815 816assert "subFunctionNotSupported received" in result 817 818ids = [t.req.routineIdentifier for t in tc.results_with_positive_response] 819 820assert 1 in ids 821 822= UDS_IOCBIEnumerator 823 824resps = [EcuResponse(None, [UDS()/UDS_IOCBIPR(dataIdentifier=1)/Raw(b"asdfbeef1")]), 825 EcuResponse(None, [UDS()/UDS_IOCBIPR(dataIdentifier=2)/Raw(b"beef2")]), 826 EcuResponse(None, [UDS()/UDS_IOCBIPR(dataIdentifier=3)/Raw(b"beef3")]), 827 EcuResponse(None, [UDS()/UDS_IOCBIPR(dataIdentifier=0xff)/Raw(b"beefff")]), 828 EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="InputOutputControlByIdentifier")])] 829 830es = [UDS_IOCBIEnumerator] 831 832scanner = executeScannerInVirtualEnvironment(resps, es, UDS_IOCBIEnumerator_kwargs={"scan_range": range(0x100)}) 833 834assert scanner.scan_completed 835assert scanner.progress() > 0.95 836 837tc = scanner.configuration.test_cases[0] 838 839assert len(tc.results_without_response) < 10 840if tc.results_without_response: 841 tc.show() 842 843assert len(tc.results_with_negative_response) == 0x100 - 4 844assert len(tc.results_with_positive_response) == 4 845assert len(tc.scanned_states) == 1 846 847result = tc.show(dump=True) 848 849assert "asdfbeef1" in result 850assert "beef2" in result 851assert "beef3" in result 852assert "beefff" in result 853assert "subFunctionNotSupported received" in result 854 855ids = [t.req.dataIdentifier for t in tc.results_with_positive_response] 856 857assert 1 in ids 858assert 2 in ids 859assert 3 in ids 860assert 0xff in ids 861 862 863= UDS_RDEnumerator 864 865memory = dict() 866 867for addr in itertools.chain(range(0x1f00), range(0xd000, 0xfff2), range(0xa000, 0xcf00), range(0x2000, 0x5f00)): 868 memory[addr] = addr & 0xff 869 870def answers_rd(resp, req): 871 global memory 872 if req.service != 0x34: 873 return False 874 if req.memorySizeLen in [1, 3, 4]: 875 return False 876 if req.memoryAddressLen in [1, 3, 4]: 877 return False 878 addr = getattr(req, "memoryAddress%d" % req.memoryAddressLen) 879 if addr not in memory.keys(): 880 return False 881 resp.memorySizeLen = req.memorySizeLen 882 return True 883 884resps = [EcuResponse(None, [UDS()/UDS_RDPR()], answers=answers_rd), 885 EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="requestOutOfRange", requestServiceId="RequestDownload")])] 886 887####################################################### 888scanner = executeScannerInVirtualEnvironment( 889 resps, [UDS_RDEnumerator], unstable_socket=False, 890 UDS_RDEnumerator_kwargs={"unittest": True}) 891 892assert scanner.scan_completed 893assert scanner.progress() > 0.95 894 895tc1 = scanner.configuration.test_cases[0] 896 897assert len(tc1.results_without_response) < 10 898if len(tc1.results_without_response): 899 tc1.show() 900 901assert len(tc1.results_with_negative_response) > 400 902assert len(tc1.results_with_positive_response) > 40 903assert len(tc1.scanned_states) == 1 904 905result = tc1.show(dump=True) 906 907assert "requestOutOfRange received " in result 908 909= UDS_RMBARandomEnumerator 910 911pkt = UDS_RMBARandomEnumerator._random_memory_addr_pkt(4, 4, 10) 912 913assert pkt.memorySizeLen == 4 914assert pkt.memoryAddressLen == 4 915assert pkt.memorySize4 == 10 916assert pkt.memoryAddress4 is not None 917 918pkt = UDS_RMBARandomEnumerator._random_memory_addr_pkt() 919 920assert pkt.memorySizeLen in [1, 2, 3, 4] 921assert pkt.memoryAddressLen in [1, 2, 3, 4] 922 923pkt2 = UDS_RMBARandomEnumerator._random_memory_addr_pkt() 924 925assert pkt != pkt2 926 927 928= UDS_RMBAEnumerator 929~ linux not_pypy 930 931memory = dict() 932 933mem_areas = [(0x100, 0x1f00), (0xd000, 0xff00), (0xa000, 0xc000), (0x3000, 0x5f00)] 934 935mem_ranges = [range(s, e) for s, e in mem_areas] 936 937mem_inner_borders = [s for s, _ in mem_areas] 938mem_inner_borders += [e - 1 for _, e in mem_areas] 939 940mem_outer_borders = [s - 1 for s, _ in mem_areas] 941mem_outer_borders += [e for _, e in mem_areas] 942 943mem_random_test_points = [] 944for _ in range(100): 945 mem_random_test_points += [random.choice(list(itertools.chain(*mem_ranges)))] 946 947for addr in itertools.chain(*mem_ranges): 948 memory[addr] = addr & 0xff 949 950def answers_rmba(resp, req): 951 global memory 952 if req.service != 0x23: 953 return False 954 if req.memorySizeLen in [1, 3, 4]: 955 return False 956 if req.memoryAddressLen in [1, 3, 4]: 957 return False 958 addr = getattr(req, "memoryAddress%d" % req.memoryAddressLen) 959 if addr not in memory.keys(): 960 return False 961 out_mem = list() 962 size = getattr(req, "memorySize%d" % req.memorySizeLen) 963 for i in range(addr, addr + size): 964 try: 965 out_mem.append(memory[i]) 966 except KeyError: 967 pass 968 resp.dataRecord = bytes(out_mem) 969 return True 970 971resps = [EcuResponse(None, [UDS()/UDS_RMBAPR(dataRecord=b'')], answers=answers_rmba), 972 EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="requestOutOfRange", requestServiceId="ReadMemoryByAddress")])] 973 974####################################################### 975scanner = executeScannerInVirtualEnvironment( 976 resps, [UDS_RMBAEnumerator], unstable_socket=False, 977 UDS_RMBARandomEnumerator_kwargs={"unittest": True}) 978 979assert scanner.scan_completed 980tc1 = scanner.configuration.stages[0][1] 981 982assert len(tc1.results_without_response) < 30 983if len(tc1.results_without_response): 984 tc1.show() 985 986assert len(tc1.results_with_negative_response) > 10 987assert len(tc1.results_with_positive_response) > 300 988assert len(tc1.scanned_states) == 1 989 990result = tc1.show(dump=True) 991 992assert "requestOutOfRange received " in result 993 994############################################################ 995 996addrs = tc1._get_memory_addresses_from_results(tc1.results_with_positive_response) 997 998print(float([tp in addrs for tp in mem_inner_borders].count(True)) / len(mem_inner_borders)) 999assert float([tp in addrs for tp in mem_inner_borders].count(True)) / len(mem_inner_borders) > 0.8 1000print(float([tp in addrs for tp in mem_random_test_points].count(True)) / len(mem_random_test_points)) 1001assert float([tp in addrs for tp in mem_random_test_points].count(True)) / len(mem_random_test_points) > 0.8 1002print(float([tp not in addrs for tp in mem_outer_borders].count(True)) / len(mem_outer_borders)) 1003assert float([tp not in addrs for tp in mem_outer_borders].count(True)) / len(mem_outer_borders) > 0.7 1004 1005 1006= UDS_TDEnumerator 1007 1008resps = [EcuResponse(None, [UDS()/UDS_TDPR(blockSequenceCounter=1)]), 1009 EcuResponse(None, [UDS()/UDS_TDPR(blockSequenceCounter=3)]), 1010 EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="TransferData")])] 1011 1012es = [UDS_TDEnumerator] 1013 1014scanner = executeScannerInVirtualEnvironment(resps, es) 1015 1016assert scanner.scan_completed 1017assert scanner.progress() > 0.95 1018 1019tc = scanner.configuration.test_cases[0] 1020 1021assert len(tc.results_without_response) < 10 1022if tc.results_without_response: 1023 tc.show() 1024 1025assert len(tc.results_with_negative_response) == 256 - 2 1026assert len(tc.results_with_positive_response) == 2 1027assert len(tc.scanned_states) == 1 1028 1029result = tc.show(dump=True) 1030 1031assert "subFunctionNotSupported received" in result 1032 1033ids = [t.req.blockSequenceCounter for t in tc.results_with_positive_response] 1034 1035assert 1 in ids 1036assert 3 in ids 1037 1038= BMW_DevJobEnumerator 1039 1040load_contrib("automotive.bmw.definitions") 1041load_contrib("automotive.bmw.enumerator") 1042 1043resps = [EcuResponse(None, [UDS()/DEV_JOB_PR(identifier=0xff00)/Raw(b"asdfbeef1")]), 1044 EcuResponse(None, [UDS()/DEV_JOB_PR(identifier=0xff02)/Raw(b"beef2")]), 1045 EcuResponse(None, [UDS()/DEV_JOB_PR(identifier=0xff03)/Raw(b"beef3")]), 1046 EcuResponse(None, [UDS()/DEV_JOB_PR(identifier=0xffff)/Raw(b"beefff")]), 1047 EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="DevelopmentJob")])] 1048 1049es = [BMW_DevJobEnumerator] 1050 1051scanner = executeScannerInVirtualEnvironment(resps, es, BMW_DevJobEnumerator_kwargs={"scan_range": range(0xFF00, 0x10000)}) 1052 1053assert scanner.scan_completed 1054assert scanner.progress() > 0.95 1055 1056tc = scanner.configuration.test_cases[0] 1057 1058assert len(tc.results_without_response) < 10 1059if tc.results_without_response: 1060 tc.show() 1061 1062assert len(tc.results_with_negative_response) == 0x100 - 4 1063assert len(tc.results_with_positive_response) == 4 1064assert len(tc.scanned_states) == 1 1065 1066result = tc.show(dump=True) 1067 1068assert "ReadTransportMessageStatus" in result 1069assert "65282" in result 1070assert "65283" in result 1071assert "ReadMemory" in result 1072assert "subFunctionNotSupported received" in result 1073assert "PR: Supported" in result 1074 1075ids = [t.req.identifier for t in tc.results_with_positive_response] 1076 1077assert 0xff00 in ids 1078assert 0xff02 in ids 1079assert 0xff03 in ids 1080assert 0xffff in ids 1081 1082= UDS_ServiceEnumerator weird issue 1083 1084resps = [EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode=0x13, requestServiceId=0x40)]), 1085 EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId=0x41)]), 1086 EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId=0x11)]), 1087 EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId=0x42)]), 1088 EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId=0x43)])] 1089 1090es = [UDS_ServiceEnumerator] 1091 1092scanner = executeScannerInVirtualEnvironment( 1093 resps, es, UDS_ServiceEnumerator_kwargs={"scan_range": [0x11, 0x40, 0x41, 0x42]}) 1094 1095assert scanner.scan_completed 1096assert scanner.progress() > 0.95 1097tc = scanner.configuration.test_cases[0] 1098tc.show() 1099 1100assert len(tc.results_with_negative_response) == 4 1101 1102= UDS_ServiceEnumerator, all range 1103 1104def req_handler(resp, req): 1105 if req.service != 0x22: 1106 return False 1107 if len(req) == 1: 1108 resp.negativeResponseCode="generalReject" 1109 return True 1110 if len(req) == 2: 1111 resp.negativeResponseCode="incorrectMessageLengthOrInvalidFormat" 1112 return True 1113 if len(req) == 3: 1114 resp.negativeResponseCode="requestOutOfRange" 1115 return True 1116 return False 1117 1118resps = [EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")], req_handler)] 1119 1120es = [UDS_ServiceEnumerator] 1121 1122debug_dissector_backup = conf.debug_dissector 1123 1124# This Enumerator is sending corrupted Packets, therefore we need to disable the debug_dissector 1125conf.debug_dissector = False 1126scanner = executeScannerInVirtualEnvironment( 1127 resps, es, UDS_ServiceEnumerator_kwargs={"request_length": 3, "scan_range": range(256)}, unstable_socket=False) 1128conf.debug_dissector = debug_dissector_backup 1129 1130assert scanner.scan_completed 1131assert scanner.progress() > 0.95 1132tc = scanner.configuration.test_cases[0] 1133 1134assert len(tc.results_without_response) < 10 1135if tc.results_without_response: 1136 tc.show() 1137 1138tc.show() 1139 1140assert len(tc.scanned_states) == 1 1141 1142result = tc.show(dump=True) 1143 1144assert "incorrectMessageLengthOrInvalidFormat" in result 1145assert "requestOutOfRange" in result 1146 1147+ Cleanup 1148 1149= Delete testsockets 1150 1151 1152cleanup_testsockets() 1153