1% Regression tests for GMLAN Scanners 2~ scanner 3 4+ Configuration 5~ conf 6 7= Imports 8 9import itertools 10import logging 11import threading 12import time 13from scapy.contrib.isotp import ISOTPMessageBuilder 14 15from test.testsocket import TestSocket, cleanup_testsockets, open_test_sockets 16 17 18############ 19############ 20+ Load general modules 21 22= Load contribution layer 23 24from scapy.contrib.automotive.gm.gmlan import * 25conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 26from scapy.contrib.automotive.gm.gmlan_scanner import * 27from scapy.contrib.automotive.ecu import * 28load_layer("can") 29 30log_automotive.setLevel(logging.DEBUG) 31 32 33= Define Testfunction 34 35def executeScannerInVirtualEnvironment(supported_responses, enumerators, **kwargs): 36 tester = TestSocket(GMLAN) 37 ecu = TestSocket(GMLAN) 38 tester.pair(ecu) 39 answering_machine = EcuAnsweringMachine(supported_responses=supported_responses, main_socket=ecu, basecls=GMLAN, verbose=False) 40 def reset(): 41 answering_machine.reset_state() 42 sniff(timeout=0.001, opened_socket=[ecu, tester]) 43 sim = threading.Thread(target=answering_machine, kwargs={ 44 "timeout": 30, "stop_filter": lambda x: bytes(x) == b"\xff\xff\xff"}) 45 sim.start() 46 try: 47 scanner = GMLAN_Scanner( 48 tester, reset_handler=reset, 49 test_cases=enumerators, timeout=0.2, retry_if_none_received=True, 50 unittest=True, **kwargs) 51 def scanner_thread(): 52 for i in range(3): 53 print("Starting scan") 54 scanner.scan(timeout=10) 55 if scanner.scan_completed: 56 print("Scan completed after %d iterations" % i) 57 return 58 scanner_t = threading.Thread(target=scanner_thread) 59 scanner_t.start() 60 scanner_t.join(timeout=120) 61 if scanner_t.is_alive(): 62 scanner.stop_scan() 63 finally: 64 tester.send(Raw(b"\xff\xff\xff")) 65 sim.join(timeout=2) 66 assert not sim.is_alive() 67 cleanup_testsockets() 68 return scanner 69 70= Load packets from candump 71 72conf.contribs['CAN']['swap-bytes'] = True 73pkts = rdpcap(scapy_path("test/pcaps/candump_gmlan_scanner.pcap.gz")) 74assert len(pkts) 75 76= Create GMLAN messages from packets 77 78builder = ISOTPMessageBuilder(basecls=GMLAN, use_ext_address=False, rx_id=[0x241, 0x641]) 79msgs = list() 80 81for p in pkts: 82 if p.data == b"ECURESET": 83 msgs.append(p) 84 else: 85 builder.feed(p) 86 if len(builder): 87 msgs.append(builder.pop()) 88 89assert len(msgs) 90 91= Create ECU-Clone from packets 92 93mEcu = Ecu(logging=False, verbose=False, store_supported_responses=True) 94 95for p in msgs: 96 if isinstance(p, CAN) and p.data == b"ECURESET": 97 mEcu.reset() 98 else: 99 mEcu.update(p) 100 101assert len(mEcu.supported_responses) 102 103= Test GMLAN_SAEnumerator evaluate_response 104 105e = GMLAN_SAEnumerator() 106 107config = {} 108 109s = EcuState(session=1) 110 111debug_dissector_backup = conf.debug_dissector 112 113# This tests involves corrupted Packets, therefore we need to disable the debug_dissector 114conf.debug_dissector = False 115 116assert False == e._evaluate_response(s, GMLAN(b"\x27\x01"), None, **config) 117config = {"exit_if_service_not_supported": True} 118assert not e._retry_pkt[s] 119assert True == e._evaluate_response(s, GMLAN(b"\x27\x01"), GMLAN(b"\x7f\x27\x11"), **config) 120assert not e._retry_pkt[s] 121assert True == e._evaluate_response(s, GMLAN(b"\x27\x01"), GMLAN(b"\x7f\x27\x22"), **config) 122assert e._retry_pkt[s] == GMLAN(b"\x27\x01") 123assert False == e._evaluate_response(s, GMLAN(b"\x27\x02"), GMLAN(b"\x7f\x27\x22"), **config) 124assert not e._retry_pkt[s] 125assert True == e._evaluate_response(s, GMLAN(b"\x27\x01"), GMLAN(b"\x7f\x27\x37"), **config) 126assert e._retry_pkt[s] == GMLAN(b"\x27\x01") 127assert False == e._evaluate_response(s, GMLAN(b"\x27\x01"), GMLAN(b"\x7f\x27\x37"), **config) 128assert not e._retry_pkt[s] 129assert True == e._evaluate_response(s, GMLAN(b"\x27\x01"), GMLAN(b"\x67\x01ab"), **config) 130assert not e._retry_pkt[s] 131assert False == e._evaluate_response(s, GMLAN(b"\x27\x01"), GMLAN(b"\x67\x02ab"), **config) 132assert not e._retry_pkt[s] 133conf.debug_dissector = debug_dissector_backup 134 135 136= Simulate ECU and run Scanner 137 138def securityAccess_Algorithm1(seed): 139 return 0x5F51 140 141keyfunction = securityAccess_Algorithm1 142 143scanner = executeScannerInVirtualEnvironment( 144 mEcu.supported_responses, 145 [GMLAN_IDOEnumerator, GMLAN_PMEnumerator, GMLAN_RDEnumerator, GMLAN_SAEnumerator], 146 GMLAN_SAEnumerator_kwargs={"keyfunction": keyfunction, "scan_range": range(2)}, 147 GMLAN_PMEnumerator_kwargs={"unittest": True}) 148 149assert len(scanner.state_paths) == 9 150assert scanner.scan_completed 151 152assert EcuState(session=1) in scanner.final_states 153assert EcuState(session=1, security_level=2) in scanner.final_states 154assert EcuState(session=3, tp=1) in scanner.final_states 155assert EcuState(session=2, tp=1, communication_control=1) in scanner.final_states 156assert EcuState(session=2, tp=1, communication_control=1, security_level=2) in scanner.final_states 157assert EcuState(session=3, tp=1, security_level=2) in scanner.final_states 158assert EcuState(session=2, tp=1, communication_control=1, security_level=2, request_download=1) in scanner.final_states 159 160= Simulate ECU and test GMLAN_RDBIEnumerator 161 162resps = [EcuResponse(None, [GMLAN()/GMLAN_RDBIPR(dataIdentifier=1)/Raw(b"asdfbeef1")]), 163 EcuResponse(None, [GMLAN()/GMLAN_RDBIPR(dataIdentifier=2)/Raw(b"beef2")]), 164 EcuResponse(None, [GMLAN()/GMLAN_RDBIPR(dataIdentifier=3)/Raw(b"beef3")]), 165 EcuResponse(None, [GMLAN()/GMLAN_RDBIPR(dataIdentifier=0xff)/Raw(b"beefff")]), 166 EcuResponse(None, [GMLAN()/GMLAN_NR(returnCode="SubFunctionNotSupported", requestServiceId="ReadDataByIdentifier")])] 167 168es = [GMLAN_RDBIEnumerator] 169 170scanner = executeScannerInVirtualEnvironment(resps, es) 171 172assert scanner.scan_completed 173tc = scanner.configuration.test_cases[0] 174 175assert len(tc.results_without_response) < 10 176if tc.results_without_response: 177 tc.show() 178 179 180assert len(tc.results_with_negative_response) == 256 - 4 181assert len(tc.results_with_positive_response) == 4 182assert len(tc.scanned_states) == 1 183 184result = tc.show(dump=True) 185 186assert "asdfbeef1" in result 187assert "beef2" in result 188assert "beef3" in result 189assert "beefff" in result 190assert "SubFunctionNotSupported received" in result 191 192ids = [t.req.dataIdentifier for t in tc.results_with_positive_response] 193 194assert 1 in ids 195assert 2 in ids 196assert 3 in ids 197assert 0xff in ids 198 199 200= Simulate ECU and test GMLAN_WDBIEnumerator 201 202def wdbi_handler(resp, req): 203 if req.service != 0x3b: 204 return False 205 assert req.dataIdentifier in [1, 2, 3, 0xff] 206 resp.dataIdentifier = req.dataIdentifier 207 if req.dataIdentifier == 1: 208 assert req.dataRecord == b'asdfbeef1' 209 return True 210 if req.dataIdentifier == 2: 211 assert req.dataRecord == b'beef2' 212 return True 213 if req.dataIdentifier == 3: 214 assert req.dataRecord == b"beef3" 215 return True 216 if req.dataIdentifier == 0xff: 217 assert req.dataRecord == b"beefff" 218 return True 219 return False 220 221resps = [EcuResponse(None, [GMLAN()/GMLAN_RDBIPR(dataIdentifier=1)/Raw(b"asdfbeef1")]), 222 EcuResponse(None, [GMLAN()/GMLAN_RDBIPR(dataIdentifier=2)/Raw(b"beef2")]), 223 EcuResponse(None, [GMLAN()/GMLAN_RDBIPR(dataIdentifier=3)/Raw(b"beef3")]), 224 EcuResponse(None, [GMLAN()/GMLAN_RDBIPR(dataIdentifier=0xff)/Raw(b"beefff")]), 225 EcuResponse(None, [GMLAN()/GMLAN_WDBIPR()], answers=wdbi_handler), 226 EcuResponse(None, [GMLAN()/GMLAN_NR(returnCode="SubFunctionNotSupported", requestServiceId="ReadDataByIdentifier")])] 227 228es = [GMLAN_WDBISelectiveEnumerator] 229 230scanner = executeScannerInVirtualEnvironment(resps, es) 231 232assert scanner.scan_completed 233tc = scanner.configuration.test_cases[0][0] 234 235assert len(tc.results_without_response) < 10 236if tc.results_without_response: 237 tc.show() 238 239 240assert len(tc.results_with_negative_response) == 256 - 4 241assert len(tc.results_with_positive_response) == 4 242assert len(tc.scanned_states) == 1 243 244result = tc.show(dump=True) 245 246assert "asdfbeef1" in result 247assert "beef2" in result 248assert "beef3" in result 249assert "beefff" in result 250assert "SubFunctionNotSupported received" in result 251 252ids = [t.req.dataIdentifier for t in tc.results_with_positive_response] 253 254assert 1 in ids 255assert 2 in ids 256assert 3 in ids 257assert 0xff in ids 258 259######################### WDBI ############################# 260tc = scanner.configuration.test_cases[0][1] 261 262assert len(tc.results_without_response) < 10 263if tc.results_without_response: 264 tc.show() 265 266 267assert len(tc.results_with_negative_response) == 0 268assert len(tc.results_with_positive_response) == 4 269assert len(tc.scanned_states) == 1 270 271result = tc.show(dump=True) 272 273ids = [t.req.dataIdentifier for t in tc.results_with_positive_response] 274 275assert 1 in ids 276assert 2 in ids 277assert 3 in ids 278assert 0xff in ids 279 280 281= Simulate ECU and test GMLAN_RDBPIEnumerator 282 283resps = [EcuResponse(None, [GMLAN()/GMLAN_RDBPIPR(parameterIdentifier=1)/Raw(b"asdfbeef1")]), 284 EcuResponse(None, [GMLAN()/GMLAN_RDBPIPR(parameterIdentifier=2)/Raw(b"asdfbeef2")]), 285 EcuResponse(None, [GMLAN()/GMLAN_RDBPIPR(parameterIdentifier=3)/Raw(b"beef3")]), 286 EcuResponse(None, [GMLAN()/GMLAN_RDBPIPR(parameterIdentifier=0xffff)/Raw(b"beefffff")]), 287 EcuResponse(None, [GMLAN()/GMLAN_NR(returnCode="SubFunctionNotSupported", requestServiceId="ReadDataByParameterIdentifier")])] 288 289es = [GMLAN_RDBPIEnumerator] 290 291scanner = executeScannerInVirtualEnvironment(resps, es, GMLAN_RDBPIEnumerator_kwargs={"scan_range":list(range(0x100)) + list(range(0xff00, 0x10000))}) 292 293assert scanner.scan_completed 294tc = scanner.configuration.test_cases[0] 295 296assert len(tc.results_without_response) < 10 297if tc.results_without_response: 298 tc.show() 299 300 301assert len(tc.results_with_negative_response) == 0x200 - 4 302assert len(tc.results_with_positive_response) == 4 303assert len(tc.scanned_states) == 1 304 305result = tc.show(dump=True) 306 307assert "asdfbeef1" in result 308assert "asdfbeef2" in result 309assert "beef3" in result 310assert "beefffff" in result 311assert "SubFunctionNotSupported received" in result 312 313ids = [t.req.identifiers[0] for t in tc.results_with_positive_response] 314 315assert 1 in ids 316assert 2 in ids 317assert 3 in ids 318assert 0xffff in ids 319 320= Simulate ECU and test GMLAN_TPEnumerator 321 322resps = [EcuResponse(None, [GMLAN(service=0x7e)])] 323 324es = [GMLAN_TPEnumerator] 325scanner = executeScannerInVirtualEnvironment(resps, es) 326 327assert scanner.scan_completed 328tc = scanner.configuration.test_cases[0] 329 330assert len(tc.results_without_response) < 10 331if tc.results_without_response: 332 tc.show() 333 334 335assert len(tc.results_with_negative_response) == 0 336assert len(tc.results_with_positive_response) == 2 337assert len(tc.scanned_states) == 2 338 339 340= Simulate ECU and test GMLAN_DCEnumerator 341 342resps = [EcuResponse(None, [GMLAN()/GMLAN_DCPR(CPIDNumber=1)]), 343 EcuResponse(None, [GMLAN()/GMLAN_DCPR(CPIDNumber=2)]), 344 EcuResponse(None, [GMLAN()/GMLAN_DCPR(CPIDNumber=3)/Raw(b"beef3")]), 345 EcuResponse(None, [GMLAN()/GMLAN_DCPR(CPIDNumber=0xff)/Raw(b"beefff")]), 346 EcuResponse(None, [GMLAN()/GMLAN_NR(returnCode="SubFunctionNotSupported", requestServiceId="DeviceControl")])] 347 348es = [GMLAN_DCEnumerator] 349scanner = executeScannerInVirtualEnvironment(resps, es) 350 351assert scanner.scan_completed 352tc = scanner.configuration.test_cases[0] 353 354assert len(tc.results_without_response) < 10 355if tc.results_without_response: 356 tc.show() 357 358 359assert len(tc.results_with_negative_response) == 256 - 4 360assert len(tc.results_with_positive_response) == 4 361assert len(tc.scanned_states) == 1 362 363ids = [t.req.CPIDNumber for t in tc.results_with_positive_response] 364 365assert 1 in ids 366assert 2 in ids 367assert 3 in ids 368assert 255 in ids 369 370result = tc.show(dump=True) 371 372assert "SubFunctionNotSupported received " in result 373 374= Simulate ECU and test GMLAN_TDEnumerator 375 376conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 377 378positive_responses_left = 4 379 380def answers_td(resp, req): 381 global positive_responses_left 382 if req.service != 0x36: 383 return False 384 if not positive_responses_left: 385 return False 386 positive_responses_left -= 1 387 resp.service = 0x76 388 return True 389 390resps = [EcuResponse(None, [GMLAN(service="TransferDataPositiveResponse")], answers=answers_td), 391 EcuResponse(None, [GMLAN()/GMLAN_NR(returnCode="RequestOutOfRange", requestServiceId="TransferData")])] 392 393es = [GMLAN_TDEnumerator] 394scanner = executeScannerInVirtualEnvironment(resps, es) 395 396assert scanner.scan_completed 397tc = scanner.configuration.test_cases[0] 398 399assert len(tc.results_without_response) < 10 400if tc.results_without_response: 401 tc.show() 402 403 404assert len(tc.results_with_negative_response) == 0x1ff - 4 405assert len(tc.results_with_positive_response) == 4 406assert len(tc.scanned_states) == 1 407 408result = tc.show(dump=True) 409 410assert "RequestOutOfRange received " in result 411 412= Simulate ECU and test GMLAN_RMBAEnumerator 1 413~ not_pypy 414 415conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 2 416 417memory = dict() 418 419mem_areas = [(0x100, 0x1f00), (0xd000, 0xff00), (0xa000, 0xc000), (0x3000, 0x5f00)] 420 421mem_ranges = [range(s, e) for s, e in mem_areas] 422 423mem_inner_borders = [s for s, _ in mem_areas] 424mem_inner_borders += [e - 1 for _, e in mem_areas] 425 426mem_outer_borders = [s - 1 for s, _ in mem_areas] 427mem_outer_borders += [e for _, e in mem_areas] 428 429mem_random_test_points = [] 430for _ in range(100): 431 mem_random_test_points += [random.choice(list(itertools.chain(*mem_ranges)))] 432 433for addr in itertools.chain(*mem_ranges): 434 memory[addr] = addr & 0xff 435 436def answers_rmba(resp, req): 437 global memory 438 if req.service != 0x23: 439 return False 440 if req.memoryAddress not in memory.keys(): 441 return False 442 out_mem = list() 443 for i in range(req.memoryAddress, req.memoryAddress + req.memorySize): 444 try: 445 out_mem.append(memory[i]) 446 except KeyError: 447 pass 448 resp.memoryAddress = req.memoryAddress 449 resp.dataRecord = bytes(out_mem) 450 return True 451 452resps = [EcuResponse(None, [GMLAN()/GMLAN_RMBAPR(memoryAddress=0, dataRecord=b'')], answers=answers_rmba), 453 EcuResponse(None, [GMLAN()/GMLAN_NR(returnCode="RequestOutOfRange", requestServiceId="ReadMemoryByAddress")])] 454 455####################################################### 456scanner = executeScannerInVirtualEnvironment(resps, [GMLAN_RMBAEnumerator]) 457 458assert scanner.scan_completed 459tc1 = scanner.configuration.test_cases[0] 460 461assert len(tc1.results_without_response) < 10 462assert len(tc1.results_with_negative_response) > 10 463assert len(tc1.results_with_positive_response) > 50 464assert len(tc1.scanned_states) == 1 465 466result = tc1.show(dump=True) 467 468assert "RequestOutOfRange received " in result 469 470 471def _get_memory_addresses_from_results(results): 472 mem_areas = [ 473 range(tup.req.memoryAddress, tup.req.memoryAddress + tup.req.memorySize) 474 for tup in results] 475 return set(list(itertools.chain.from_iterable(mem_areas))) 476 477############################################################ 478 479addrs = _get_memory_addresses_from_results(tc1.results_with_positive_response) 480 481print([tp in addrs for tp in mem_inner_borders].count(True) / len(mem_inner_borders)) 482assert [tp in addrs for tp in mem_inner_borders].count(True) / len(mem_inner_borders) > 0.8 483print([tp in addrs for tp in mem_random_test_points].count(True) / len(mem_random_test_points)) 484assert [tp in addrs for tp in mem_random_test_points].count(True) / len(mem_random_test_points) > 0.8 485print([tp not in addrs for tp in mem_outer_borders].count(True) / len(mem_outer_borders)) 486assert [tp not in addrs for tp in mem_outer_borders].count(True) / len(mem_outer_borders) > 0.8 487 488 489= Simulate ECU and test GMLAN_RMBAEnumerator 2 490* This test takes very long to execute 491 492~ disabled 493 494conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 3 495 496memory = dict() 497 498for addr in itertools.chain(range(0x10000), range(0xf00000, 0xf0f000)): 499 memory[addr] = addr & 0xff 500 501resps = [EcuResponse(None, [GMLAN()/GMLAN_RMBAPR(memoryAddress=0, dataRecord=b'')], answers=answers_rmba), 502 EcuResponse(None, [GMLAN()/GMLAN_NR(returnCode="RequestOutOfRange", requestServiceId="ReadMemoryByAddress")])] 503 504scanner = executeScannerInVirtualEnvironment(resps, [GMLAN_RMBAEnumerator]) 505 506assert scanner.scan_completed 507tc = scanner.configuration.test_cases[0] 508 509assert len(tc.results_without_response) < 10 510if tc.results_without_response: 511 tc.show() 512 513 514assert len(tc.results_with_negative_response) > 350 515assert len(tc.results_with_positive_response) > 50 516assert len(tc.scanned_states) == 1 517 518addrs = [t.req.memoryAddress for t in tc.results_with_positive_response] 519 520assert 0 in addrs 521assert 0x10 in addrs 522assert 0xf0 in addrs 523assert 0x3000 in addrs 524assert 0x3090 in addrs 525assert 0xa100 in addrs 526assert 0xa1f0 in addrs 527assert 0xa200 in addrs 528assert 0xa2f0 in addrs 529assert 0xf000 in addrs 530assert 0xf0f0 in addrs 531 532result = tc.show(dump=True) 533 534assert "RequestOutOfRange received " in result 535 536+ Cleanup 537 538= Delete TestSockets 539 540cleanup_testsockets() 541