1% Regression tests for the Ecu utility 2 3# More information at http://www.secdev.org/projects/UTscapy/ 4 5############ 6############ 7 8+ Setup 9~ conf command 10 11= Load modules 12 13import copy 14import itertools 15 16load_contrib("isotp", globals_dict=globals()) 17load_contrib("automotive.uds", globals_dict=globals()) 18load_contrib("automotive.gm.gmlan", globals_dict=globals()) 19load_layer("can", globals_dict=globals()) 20conf.contribs["CAN"]["swap-bytes"] = True 21 22= Load Ecu module 23 24load_contrib("automotive.ecu", globals_dict=globals()) 25from scapy.contrib.automotive.uds_ecu_states import * 26from scapy.contrib.automotive.uds_logging import * 27from scapy.contrib.automotive.gm.gmlan_ecu_states import * 28from scapy.contrib.automotive.gm.gmlan_logging import * 29 30+ EcuState Basic checks 31 32= Check EcuState basic functionality 33 34state = EcuState() 35state["session"] = 2 36state["securityAccess"] = 4 37print(repr(state)) 38assert repr(state) == "securityAccess4session2" 39 40= More complex tests 41 42state = EcuState(ses=4) 43assert state.ses == 4 44state.ses = 5 45assert state.ses == 5 46 47= Even more complex tests 48 49state = EcuState(myinfo="42") 50 51state.ses = 5 52assert state.ses == 5 53 54state["ses"] = None 55assert state.ses is None 56 57state.ses = 5 58assert 5 == state.ses 59 60assert "42" == state.myinfo 61assert repr(state) == "myinfo42ses5" 62 63= Delete Attribute Test 64 65state = EcuState(myinfo="42") 66 67state.ses = 5 68assert state.ses == 5 69 70del state.ses 71 72try: 73 x = state.ses 74 assert False 75except (KeyError, AttributeError): 76 assert state.myinfo == "42" 77 78= Copy tests 79 80state = EcuState(myinfo="42") 81state.ses = 5 82 83ns = copy.copy(state) 84 85ns.ses = 6 86 87assert ns.ses == 6 88assert state.ses == 5 89assert ns.myinfo == "42" 90 91 92= Move tests 93 94state = EcuState(myinfo="42") 95state.ses = 5 96 97ns = state 98 99ns.ses = 6 100 101assert ns.ses == 6 102assert state.ses == 6 103assert ns.myinfo == "42" 104 105= equal tests 106 107state = EcuState(myinfo="42") 108state.ses = 5 109 110ns = copy.copy(state) 111 112assert state == ns 113assert hash(state) == hash(ns) 114 115ns.ses = 6 116 117assert state != ns 118assert hash(state) != hash(ns) 119 120ns.ses = 5 121 122assert state == ns 123assert hash(state) == hash(ns) 124 125ns.sa = 5 126 127assert state != ns 128assert hash(state) != hash(ns) 129 130 131= hash tests 132 133state = EcuState(myinfo="42") 134state.ses = 5 135 136ns = copy.copy(state) 137 138assert hash(state) == hash(ns) 139 140ns.ses = 6 141 142assert hash(state) != hash(ns) 143 144ns.ses = 5 145 146assert hash(state) == hash(ns) 147 148ns.sa = 5 149 150assert hash(state) != hash(ns) 151 152= command tests 153 154state = EcuState(myinfo="42") 155state.ses = 5 156 157state.command() 158assert "EcuState(myinfo='42', ses=5)" == state.command() 159 160= less than tests 161 162s1 = EcuState() 163s2 = EcuState() 164 165s1.a = 1 166s2.a = 2 167 168assert s1 < s2 169 170s1.b = 4 171 172assert s1 > s2 173 174s2.b = 1 175 176assert s1 < s2 177 178s1.a = 2 179 180assert s1 > s2 181 182= less than tests 2 183 184s1 = EcuState() 185s2 = EcuState() 186 187s1.c = "x" 188s2.c = 4 189exception = False 190 191try: 192 assert s1 < s2 193except TypeError: 194 exception = True 195 196assert exception 197 198= less than tests 3 199 200s1 = EcuState() 201s2 = EcuState() 202 203 204s1.A = 1 205s1.a = 2 206 207s2.A = 2 208s2.a = 1 209 210assert s1 < s2 211 212= less than tests 4 213 214s1 = EcuState() 215s2 = EcuState() 216 217 218s1.A = 1 219s1.a = 2 220 221s2.A = 2 222s2.b = 100 223 224assert s1 < s2 225 226= less than tests 5 227 228s1 = EcuState() 229s2 = EcuState() 230 231 232s1.A = 100 233s1.a = 2 234 235s2.A = 2 236s2.b = 100 237 238assert s1 > s2 239assert not s1 > s1 240assert not s1 < s1 241 242= less than tests 6 243 244s1 = EcuState() 245s2 = EcuState() 246 247 248s1.A = 100 249s1.B = 200 250 251s2.a = 2 252s2.b = 1 253 254assert s1 < s2 255 256= contains test 257 258s1 = EcuState(ses=[1,2,3]) 259s2 = EcuState(ses=1) 260 261assert s1 != s2 262assert s2 in s1 263assert s1 not in s2 264 265s1 = EcuState(ses=[2,3]) 266s2 = EcuState(ses=1) 267 268assert s1 != s2 269assert s2 not in s1 270assert s1 not in s2 271 272 273s1 = EcuState(ses=[1,2,3], security=5) 274s2 = EcuState(ses=1) 275 276assert s1 != s2 277assert s2 not in s1 278assert s1 not in s2 279 280s1 = EcuState(ses=range(4), security=[None, 5]) 281s2 = EcuState(ses=1) 282 283assert s1 != s2 284assert s2 in s1 285assert s1 not in s2 286 287s1 = EcuState(ses=range(4), security=[None, 5]) 288s2 = EcuState(ses=range(2)) 289 290assert s1 != s2 291assert s2 < s1 292assert s2 in s1 293assert s1 not in s2 294 295s1 = EcuState(ses=range(4), security=[None, 5]) 296s2 = EcuState(ses=range(2), security=5) 297 298assert s1 != s2 299assert s2 in s1 300assert s1 not in s2 301 302s1 = EcuState(ses=range(4), security=[None, 5]) 303s2 = EcuState(ses=range(5)) 304 305assert s1 != s2 306assert s2 not in s1 307assert s1 not in s2 308 309s1 = EcuState(ses=range(4), security=[None, range(5)]) 310s2 = EcuState(ses=3) 311 312print(s1._expand()) 313print(s2._expand()) 314 315assert s1 != s2 316assert s2 in s1 317assert s1 not in s2 318 319s1 = EcuState(ses=range(4), security=[None, range(5), [5, 7, range(4), [range(10), 10]]]) 320s2 = EcuState(ses=3, security=10) 321 322print(s1._expand()) 323print(s2._expand()) 324 325assert s1 != s2 326assert s2 in s1 327assert s1 not in s2 328 329s1 = EcuState(ses=range(4), security=[None, range(5), [5, 7, range(4), [range(10), "B"]]]) 330s2 = EcuState(ses=3, security="B") 331 332print(s1._expand()) 333print(s2._expand()) 334 335assert s1 != s2 336assert s2 in s1 337assert s1 not in s2 338 339s1 = EcuState(ses=range(4), security=[None, range(5), [5, 7, range(4), [range(10), "B"]]]) 340s2 = EcuState(ses=3, security="C") 341 342print(s1._expand()) 343print(s2._expand()) 344 345assert s1 != s2 346assert s2 not in s1 347assert s1 not in s2 348 349s1 = EcuState(ses=range(3), security=5) 350s2 = EcuState(ses=1, security=5) 351 352assert s1 != s2 353assert s2 in s1 354assert s1 not in s2 355 356s1 = EcuState(ses=range(3), security=(x for x in range(1, 10, 2))) 357s2 = EcuState(ses=1, security=5) 358 359assert s1 != s2 360assert s2 in s1 361assert s1 not in s2 362 363s1 = EcuState(ses=[1,2,3]) 364s2 = EcuState(ses=[1,2,3]) 365 366assert s1 in s2 367assert s2 in s1 368assert s1 == s2 369 370s1 = EcuState(ses=1) 371s2 = EcuState(ses=1) 372 373assert s1 in s2 374assert s2 in s1 375assert s1 == s2 376 377s1 = EcuState(ses=range(3), security=range(5)) 378for ses, sec in itertools.product(range(3), range(5)): 379 s2 = EcuState(ses=ses, security=sec) 380 assert s1 != s2 381 assert s2 in s1 382 assert s1 not in s2 383 384 385s1 = EcuState(ses=[0, 1, 2], security=[43, 44]) 386for ses, sec in itertools.product(range(3), range(43, 45)): 387 s2 = EcuState(ses=ses, security=sec) 388 assert s1 != s2 389 assert s2 in s1 390 assert s1 not in s2 391 392s1 = EcuState(ses=[0, 1, 2], security=["a", "b"]) 393for ses, sec in itertools.product(range(3), (x for x in "ab")): 394 s2 = EcuState(ses=ses, security=sec) 395 assert s1 != s2 396 assert s2 in s1 397 try: 398 assert s1 not in s2 399 except TypeError: 400 assert True 401 402s1 = [EcuState(ses=1), EcuState(ses=2), EcuState(ses=3)] 403s2 = EcuState(ses=3) 404 405assert s2 in s1 406assert s1 not in s2 407 408 409s1 = EcuState(ses=1, sa="SEC") 410s2 = EcuState(ses=1, sa="SOC") 411 412assert s1 not in s2 413assert s2 not in s1 414assert s1 != s2 415 416s1 = EcuState(ses=1, sa="SEC") 417s2 = EcuState(ses=1, sa="SEC") 418 419assert s1 in s2 420assert s2 in s1 421assert s1 == s2 422 423 424s1 = EcuState(ses=1, sa="SEC") 425s2 = EcuState(ses=1, sa=["SEC", "SOL"]) 426 427 428assert s1 in s2 429assert s2 not in s1 430assert s1 != s2 431 432 433s1 = EcuState(ses=1, sa=b"SEC") 434s2 = EcuState(ses=1, sa=[b"SEC", "SOL"]) 435 436assert s1 in s2 437assert s2 not in s1 438assert s1 != s2 439 440 441+ EcuState modification tests 442 443= Basic definitions for tests 444 445class myPack1(Packet): 446 fields_desc = [ 447 IntField("fakefield", 1) 448 ] 449 450class myPack2(Packet): 451 fields_desc = [ 452 IntField("statefield", 1) 453 ] 454 455@EcuState.extend_pkt_with_modifier(myPack2) 456def modify_ecu_state(self, req, ecustate): 457 # type: (Packet, Packet, EcuState) -> None 458 ecustate.state = self.statefield 459 460pkt = myPack1()/myPack2() 461st = EcuState() 462exception = False 463 464try: 465 assert st.state == 1 466except AttributeError: 467 exception = True 468 469assert exception == True 470assert EcuState.is_modifier_pkt(pkt) 471assert not EcuState.is_modifier_pkt(myPack1()) 472 473mod = EcuState.get_modified_ecu_state(pkt, Raw(), st) 474assert mod != st 475assert mod.state ==1 476 477pkt2 = myPack1()/myPack1()/myPack1()/myPack2(statefield=5) 478mod2 = EcuState.get_modified_ecu_state(pkt2, Raw(), mod) 479 480assert mod != mod2 481assert mod < mod2 482 483pkt2 = myPack1()/myPack1()/myPack1()/myPack2(statefield=4)/myPack2(statefield=5) 484mod2 = EcuState.get_modified_ecu_state(pkt2, Raw(), mod) 485mod.state = 5 486assert mod != mod2 487assert mod > mod2 488 489+ EcuResponse tests 490 491= Basic checks 492 493resp = EcuResponse(EcuState(session=1), UDS()/UDS_DSCPR(b"\x03")) 494 495assert not resp.supports_state(EcuState()) 496assert not resp.supports_state(EcuState(session=2)) 497assert resp.supports_state(EcuState(session=1)) 498assert resp.answers(UDS()/UDS_DSC(b"\x03")) 499 500= Command checks 501 502resp = EcuResponse(EcuState(session=1), UDS()/UDS_DSCPR(b"\x03")) 503cmd = resp.command() 504 505print(cmd) 506resp1 = eval(cmd) 507assert resp1 == resp 508 509= Command checks 2 510 511p1 = UDS(bytes(UDS()/UDS_NR(b"\x10\x00"))) 512p2 = UDS(bytes(UDS()/UDS_DSCPR(b"\x03"))) 513 514resp = EcuResponse([EcuState(session=1), EcuState(session=3)], [p1, p2]) 515cmd = resp.command() 516 517print(cmd) 518resp1 = eval(cmd) 519assert any(resp1.supports_state(s) for s in resp.states) 520assert any(resp.supports_state(s) for s in resp1.states) 521assert len(resp.responses) == len(resp1.responses) 522assert all(bytes(x) == bytes(y) for x, y in zip(resp.responses, resp1.responses)) 523assert resp1 == resp 524 525= Compare check 526 527p1 = UDS(bytes(UDS()/UDS_NR(b"\x10\x00"))) 528p2 = UDS(bytes(UDS()/UDS_DSCPR(b"\x03"))) 529 530resp = EcuResponse([EcuState(session=1), EcuState(session=3)], [p1, p2]) 531 532resp1 = EcuResponse([EcuState(session=1)], [p1, p2]) 533 534resp2 = EcuResponse([EcuState(session=2)], [p1, p2]) 535resp3 = EcuResponse([EcuState(session=1)], [p2]) 536 537 538assert resp == resp1 539assert resp != resp2 540assert resp != resp3 541 542= Key response check 543 544req = UDS()/UDS_DSC(b"\x03") 545p1 = UDS(bytes(UDS()/UDS_NR(b"\x10\x00"))) 546p2 = UDS(bytes(UDS()/UDS_DSCPR(b"\x03"))) 547 548resp = EcuResponse([EcuState(session=1), EcuState(session=3)], [p1, p2]) 549 550assert resp.answers(req) 551assert resp.key_response.answers(req) 552 553 554 555+ Ecu Simple operations 556 557= Log all commands applied to an Ecu 558 559msgs = [UDS(service=16) / UDS_DSC(diagnosticSessionType=3), 560 UDS(service=16) / UDS_DSC(diagnosticSessionType=4), 561 UDS(service=16) / UDS_DSC(diagnosticSessionType=5), 562 UDS(service=16) / UDS_DSC(diagnosticSessionType=6), 563 UDS(service=16) / UDS_DSC(diagnosticSessionType=2)] 564 565ecu = Ecu(verbose=False, store_supported_responses=False) 566ecu.update(PacketList(msgs)) 567assert len(ecu.log["DiagnosticSessionControl"]) == 5 568timestamp, value = ecu.log["DiagnosticSessionControl"][0] 569assert timestamp > 0 570assert value == "extendedDiagnosticSession" 571assert ecu.log["DiagnosticSessionControl"][-1][1] == "programmingSession" 572 573 574= Trace all commands applied to an Ecu 575 576msgs = [UDS(service=16) / UDS_DSC(diagnosticSessionType=3), 577 UDS(service=80) / UDS_DSCPR(diagnosticSessionType=3, sessionParameterRecord=b'\\x002\\x01\\xf4')] 578 579ecu = Ecu(verbose=True, logging=False, store_supported_responses=False) 580ecu.update(PacketList(msgs)) 581assert ecu.state.session == 3 582 583 584= Generate supported responses of an Ecu 585 586msgs = [UDS(service=16) / UDS_DSC(diagnosticSessionType=3), 587 UDS(service=80) / UDS_DSCPR(diagnosticSessionType=3, sessionParameterRecord=b'\\x002\\x01\\xf4'), 588 UDS(service=16) / UDS_DSC(diagnosticSessionType=4)] 589 590ecu = Ecu(verbose=False, logging=False, store_supported_responses=True) 591ecu.update(PacketList(msgs)) 592supported_responses = ecu.supported_responses 593unanswered_packets = ecu.unanswered_packets 594assert ecu.state.session == 3 595assert len(supported_responses) == 1 596assert len(unanswered_packets) == 1 597 598response = supported_responses[0] 599print(response.command()) 600assert response.supports_state(EcuState()) 601assert response.key_response.service == 80 602assert unanswered_packets[0].diagnosticSessionType == 4 603 604 605+ Ecu Advanced checks 606 607= Analyze multiple UDS messages 608 609udsmsgs = sniff(offline=scapy_path("test/pcaps/ecu_trace.pcap.gz"), 610 session=ISOTPSession(use_ext_address=False, basecls=UDS), 611 count=50, timeout=3) 612 613assert len(udsmsgs) == 50 614 615ecu = Ecu() 616ecu.update(udsmsgs) 617response = ecu.supported_responses[0] 618assert response.supports_state(EcuState()) 619assert response.key_response.service == 80 620assert response.key_response.diagnosticSessionType == 3 621response = ecu.supported_responses[1] 622assert response.supports_state(EcuState(session=3)) 623assert response.key_response.service == 80 624assert response.key_response.diagnosticSessionType == 2 625response = ecu.supported_responses[4] 626print(response) 627state = EcuState(session=2, security_level=18) 628print(state) 629assert response.supports_state(state) 630assert response.key_response.service == 110 631assert response.key_response.dataIdentifier == 61786 632assert len(ecu.log["TransferData"]) == 2 633 634+ EcuSession tests 635 636= Analyze on the fly with EcuSession 637 638session = EcuSession() 639 640with PcapReader(scapy_path("test/pcaps/ecu_trace.pcap.gz")) as sock: 641 udsmsgs = sniff(session=ISOTPSession(supersession=session, use_ext_address=False, basecls=UDS), count=50, opened_socket=sock, timeout=3) 642 643assert len(udsmsgs) == 50 644 645ecu = session.ecu 646response = ecu.supported_responses[0] 647assert response.supports_state(EcuState()) 648assert response.key_response.service == 80 649assert response.key_response.diagnosticSessionType == 3 650response = ecu.supported_responses[1] 651assert response.supports_state(EcuState(session=3)) 652assert response.key_response.service == 80 653assert response.key_response.diagnosticSessionType == 2 654response = ecu.supported_responses[4] 655print(response) 656state = EcuState(session=2, security_level=18) 657print(state) 658assert response.supports_state(state) 659assert response.key_response.service == 110 660assert response.key_response.dataIdentifier == 61786 661assert len(ecu.log["TransferData"]) == 2 662 663 664= Analyze on the fly with EcuSession GMLAN1 665 666session = EcuSession() 667 668conf.contribs['CAN']['swap-bytes'] = True 669 670with PcapReader(scapy_path("test/pcaps/gmlan_trace.pcap.gz")) as sock: 671 gmlanmsgs = sniff(session=ISOTPSession(supersession=session, rx_id=[0x241, 0x641, 0x101], basecls=GMLAN), count=2, opened_socket=sock, timeout=3) 672 ecu = session.ecu 673 print("Check 1 after change to diagnostic mode") 674 assert len(ecu.supported_responses) == 1 675 assert ecu.state == EcuState(session=3) 676 gmlanmsgs = sniff(session=ISOTPSession(supersession=session, rx_id=[0x241, 0x641, 0x101], basecls=GMLAN), count=6, opened_socket=sock) 677 ecu = session.ecu 678 print("Check 2 after some more messages were read1") 679 assert len(ecu.supported_responses) == 3 680 print("Check 2 after some more messages were read2") 681 assert ecu.state.session == 3 682 print("assert 1") 683 assert ecu.state.communication_control == 1 684 gmlanmsgs = sniff(session=ISOTPSession(supersession=session, rx_id=[0x241, 0x641, 0x101], basecls=GMLAN), count=2, opened_socket=sock) 685 ecu = session.ecu 686 print("Check 3 after change to programming mode (bootloader)") 687 assert len(ecu.supported_responses) == 4 688 assert ecu.state.session == 2 689 assert ecu.state.communication_control == 1 690 gmlanmsgs = sniff(session=ISOTPSession(supersession=session, rx_id=[0x241, 0x641, 0x101], basecls=GMLAN), count=6, opened_socket=sock) 691 ecu = session.ecu 692 print("Check 4 after gaining security access") 693 assert len(ecu.supported_responses) == 6 694 assert ecu.state.session == 2 695 assert ecu.state.security_level == 2 696 assert ecu.state.communication_control == 1 697 698= Analyze on the fly with EcuSession GMLAN logging test 699 700session = EcuSession(verbose=False, store_supported_responses=False) 701 702conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 703conf.contribs['CAN']['swap-bytes'] = True 704 705conf.debug_dissector = True 706gmlanmsgs = sniff(offline=scapy_path("test/pcaps/gmlan_trace.pcap.gz"), 707 session=ISOTPSession(supersession=session, rx_id=[0x241, 0x641, 0x101], basecls=GMLAN), 708 count=200, timeout=6) 709 710ecu = session.ecu 711assert len(ecu.supported_responses) == 0 712 713assert len([m for m in gmlanmsgs if m.sprintf("%GMLAN.service%") == "TransferData"]) == len(ecu.log["TransferData"]) 714assert len([m for m in gmlanmsgs if m.sprintf("%GMLAN.service%") == "RequestDownload"]) == len(ecu.log["RequestDownload"]) 715assert len([m for m in gmlanmsgs if m.sprintf("%GMLAN.service%") == "ReadDataByIdentifier"]) == len(ecu.log["ReadDataByIdentifier"]) 716 717assert len(ecu.log["SecurityAccess"]) == 2 718assert len(ecu.log["SecurityAccessPositiveResponse"]) == 2 719 720assert ecu.log["TransferData"][-1][1][0] == "downloadAndExecuteOrExecute" 721