% Regression tests for gmlanutil ~ scanner + Configuration ~ conf = Imports from scapy.contrib.automotive import log_automotive from test.testsocket import TestSocket, cleanup_testsockets import logging ############ ############ + Load general modules = Load contribution layer load_layer("can", globals_dict=globals()) load_contrib("automotive.gm.gmlan", globals_dict=globals()) load_contrib("automotive.gm.gmlanutils", globals_dict=globals()) log_automotive.setLevel(logging.DEBUG) = Define test sockets isotpsock2 = TestSocket(GMLAN) isotpsock = TestSocket(GMLAN) isotpsock2.pair(isotpsock) ############################################################################## + GMLAN_RequestDownload Tests ############################################################################## = Positive, immediate positive response ecusimSuccessfullyExecuted = False started = threading.Event() def ecusim(): global ecusimSuccessfullyExecuted ecusimSuccessfullyExecuted= True requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) pkt = GMLAN()/GMLAN_RD(memorySize=4) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False else: ecusimSuccessfullyExecuted = True ack = b"\x74" isotpsock2.send(ack) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_RequestDownload(isotpsock, 4, timeout=1) == True thread.join(timeout=5) assert res assert ecusimSuccessfullyExecuted == True = Negative, immediate negative response started = threading.Event() def ecusim(): isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) nr = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x22) isotpsock2.send(nr) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_RequestDownload(isotpsock, 4, timeout=1) == False thread.join(timeout=5) assert res = Negative, timeout assert GMLAN_RequestDownload(isotpsock, 4, timeout=0.01) == False ############################ Response pending = Positive, after response pending started = threading.Event() def ecusim(): isotpsock2.sniff(count=1, timeout=2, started_callback=started.set) pending = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x78) isotpsock2.send(pending) ack = b"\x74" isotpsock2.send(ack) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_RequestDownload(isotpsock, 4, timeout=2) == True thread.join(timeout=5) assert res = Positive, hold response pending for several messages tout = 0.1 repeats = 4 started = threading.Event() def ecusim(): isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) ack = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x78) for i in range(repeats): isotpsock2.send(ack) time.sleep(tout) ack = b"\x74" isotpsock2.send(ack) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) starttime = time.time() # may be inaccurate -> on some systems only seconds precision result = GMLAN_RequestDownload(isotpsock, 4, timeout=repeats*tout+0.5) endtime = time.time() + 1 thread.join(timeout=5) assert result print(endtime - starttime) print(tout * (repeats - 1)) assert (endtime - starttime) >= tout * (repeats - 1) = Negative, negative response after response pending started = threading.Event() def ecusim(): isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) pending = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x78) isotpsock2.send(pending) nr = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x22) isotpsock2.send(nr) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_RequestDownload(isotpsock, 4, timeout=0.1) == False thread.join(timeout=5) assert res = Negative, timeout after response pending started = threading.Event() def ecusim(): isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) pending = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x78) isotpsock2.send(pending) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_RequestDownload(isotpsock, 4, timeout=0.1) == False thread.join(timeout=5) assert res = Positive, pending message from different service interferes while pending started = threading.Event() def ecusim(): isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) pending = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x78) isotpsock2.send(pending) wrongservice = GMLAN()/GMLAN_NR(requestServiceId=0x36, returnCode=0x78) isotpsock2.send(wrongservice) isotpsock2.send(pending) ack = b"\x74" isotpsock2.send(ack) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_RequestDownload(isotpsock, 4, timeout=0.1) == True thread.join(timeout=5) assert res = Positive, negative response from different service interferes while pending started = threading.Event() def ecusim(): isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) pending = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x78) isotpsock2.send(pending) wrongservice = GMLAN()/GMLAN_NR(requestServiceId=0x36, returnCode=0x22) isotpsock2.send(wrongservice) isotpsock2.send(pending) ack = b"\x74" isotpsock2.send(ack) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_RequestDownload(isotpsock, 4, timeout=0.1) == True thread.join(timeout=5) assert res ################### RETRY = Positive, first: immediate negative response, retry: Positive started = threading.Event() def ecusim(): global ecusimSuccessfullyExecuted ecusimSuccessfullyExecuted= True # negative requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) pkt = GMLAN()/GMLAN_RD(memorySize=4) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False nr = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x22) # positive retry print("retry") requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(nr)) pkt = GMLAN()/GMLAN_RD(memorySize=4) print(requ) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = b"\x74" isotpsock2.send(ack) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_RequestDownload(isotpsock, 4, timeout=0.1, retry=1) == True thread.join(timeout=5) assert res assert ecusimSuccessfullyExecuted == True ############################################################################## + GMLAN_TransferData Tests ############################################################################## = Positive, short payload, scheme = 4 conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): global ecusimSuccessfullyExecuted ecusimSuccessfullyExecuted= True requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) pkt = GMLAN() / GMLAN_TD(startingAddress=0x40000000, dataRecord=payload) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = b"\x76" isotpsock2.send(ack) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_TransferData(isotpsock, 0x40000000, payload, timeout=1) == True thread.join(timeout=5) assert res assert ecusimSuccessfullyExecuted == True = Positive, short payload, scheme = 3 conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 3 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): global ecusimSuccessfullyExecuted ecusimSuccessfullyExecuted= True requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) pkt = GMLAN() / GMLAN_TD(startingAddress=0x400000, dataRecord=payload) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = b"\x76" isotpsock2.send(ack) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_TransferData(isotpsock, 0x400000, payload, timeout=1) == True thread.join(timeout=5) assert res assert ecusimSuccessfullyExecuted == True = Positive, short payload, scheme = 2 conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 2 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): global ecusimSuccessfullyExecuted ecusimSuccessfullyExecuted = True requ = isotpsock2.sniff(count=1, timeout=2, started_callback=started.set) pkt = GMLAN() / GMLAN_TD(startingAddress=0x4000, dataRecord=payload) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = b"\x76" isotpsock2.send(ack) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_TransferData(isotpsock, 0x4000, payload, timeout=2) == True thread.join(timeout=5) assert res assert ecusimSuccessfullyExecuted == True = Negative, short payload conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): global ecusimSuccessfullyExecuted ecusimSuccessfullyExecuted= True isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) nr = GMLAN() / GMLAN_NR(requestServiceId=0x36, returnCode=0x22) isotpsock2.send(nr) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_TransferData(isotpsock, 0x40000000, payload, timeout=1) == False thread.join(timeout=5) assert res = Negative, timeout assert GMLAN_TransferData(isotpsock, 0x4000, payload, timeout=0.1) == False = Positive, long payload conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): global ecusimSuccessfullyExecuted ecusimSuccessfullyExecuted= True requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) pkt = GMLAN() / GMLAN_TD(startingAddress=0x40000000, dataRecord=payload*2) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = b"\x76" # second package with inscreased address requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) pkt = GMLAN() / GMLAN_TD(startingAddress=0x40000010, dataRecord=payload * 2) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = b"\x76" isotpsock2.send(ack) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_TransferData(isotpsock, 0x40000000, payload*4, maxmsglen=16, timeout=1) == True thread.join(timeout=5) assert res assert ecusimSuccessfullyExecuted == True # = Positive, first part of payload succeeds, second pending, then fails, retry succeeds conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" started = threading.Event() def ecusim(): isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) ack = b"\x76" # second package with inscreased address isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) pending = GMLAN() / GMLAN_NR(requestServiceId=0x36, returnCode=0x78) isotpsock2.send(pending) nr = GMLAN() / GMLAN_NR(requestServiceId=0x36, returnCode=0x22) isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(nr)) ack = b"\x76" isotpsock2.send(ack) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_TransferData(isotpsock, 0x40000000, payload*4, maxmsglen=16, timeout=0.1, retry=1) == True thread.join(timeout=5) assert res ############ = Positive, maxmsglen length check -> message is split automatically conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" ecusimSuccessfullyExecuted = True sim_started = threading.Event() def ecusim(): global ecusimSuccessfullyExecuted ecusimSuccessfullyExecuted= True requ = isotpsock2.sniff(count=1, timeout=3, started_callback=sim_started.set) pkt = GMLAN() / GMLAN_TD(startingAddress=0x40000000, dataRecord=payload*511+payload[:1]) if len(requ) == 0 or bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False return ack = b"\x76" # second package with inscreased address requ = isotpsock2.sniff(count=1, timeout=3, started_callback=lambda: isotpsock2.send(ack)) pkt = GMLAN() / GMLAN_TD(startingAddress=0x40000FF9, dataRecord=payload[1:]) if len(requ) == 0 or bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False return ack = b"\x76" isotpsock2.send(ack) thread = threading.Thread(target=ecusim) thread.name = "EcuSimulator" + thread.name sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() sim_started.wait(timeout=5) res = GMLAN_TransferData(isotpsock, 0x40000000, payload*512, maxmsglen=0x1000000, timeout=8) == True thread.join(timeout=5) assert res assert ecusimSuccessfullyExecuted == True ############ Address boundary checks = Positive, highest possible address for scheme conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" started = threading.Event() def ecusim(): isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) ack = b"\x76" isotpsock2.send(ack) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_TransferData(isotpsock, 2**32 - 1, payload, timeout=1) == True thread.join(timeout=5) assert res = Negative, invalid address (too large for addressing scheme) conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" started = threading.Event() def ecusim(): isotpsock2.sniff(count=1, timeout=0.1, started_callback=started.set) ack = b"\x76" isotpsock2.send(ack) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_TransferData(isotpsock, 2**32, payload, timeout=0.1) == False thread.join(timeout=5) assert res = Positive, address zero conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) ack = b"\x76" isotpsock2.send(ack) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_TransferData(isotpsock, 0x00, payload, timeout=1) == True thread.join(timeout=5) assert res = Negative, negative address conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" started = threading.Event() def ecusim(): isotpsock2.sniff(count=1, timeout=0.1, started_callback=started.set) ack = b"\x76" isotpsock2.send(ack) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_TransferData(isotpsock, -1, payload, timeout=0.1) == False thread.join(timeout=5) assert res ############################################ + GMLAN_TransferPayload Tests ############################################ = Positive, short payload conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): global ecusimSuccessfullyExecuted ecusimSuccessfullyExecuted= True requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) pkt = GMLAN()/GMLAN_RD(memorySize=len(payload)) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = b"\x74" requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) pkt = GMLAN() / GMLAN_TD(startingAddress=0x40000000, dataRecord=payload) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = b"\x76" isotpsock2.send(ack) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_TransferPayload(isotpsock, 0x40000000, payload, timeout=1) == True thread.join(timeout=5) assert res assert ecusimSuccessfullyExecuted == True ############################################ + GMLAN_GetSecurityAccess Tests ############################################ = KeyFunction keyfunc = lambda seed : seed - 0x1FBE = Positive scenario, level 1, tests if keyfunction applied properly ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): global ecusimSuccessfullyExecuted ecusimSuccessfullyExecuted= True # wait for request requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) pkt = GMLAN()/GMLAN_SA(subfunction=1) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0xdead) # wait for key requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(seedmsg)) pkt = GMLAN()/GMLAN_SA(subfunction=2, securityKey=0xbeef) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False nr = GMLAN() / GMLAN_NR(requestServiceId=0x27, returnCode=0x35) isotpsock2.send(nr) else: pr = GMLAN()/GMLAN_SAPR(subfunction=2) isotpsock2.send(pr) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_GetSecurityAccess(isotpsock, keyfunc, level=1, timeout=0.1) == True thread.join(timeout=5) assert res assert ecusimSuccessfullyExecuted == True = Positive scenario, level 3 ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): global ecusimSuccessfullyExecuted ecusimSuccessfullyExecuted= True # wait for request requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) pkt = GMLAN()/GMLAN_SA(subfunction=3) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False seedmsg = GMLAN()/GMLAN_SAPR(subfunction=3, securitySeed=0xdead) # wait for key requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(seedmsg)) pkt = GMLAN()/GMLAN_SA(subfunction=4, securityKey=0xbeef) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False nr = GMLAN() / GMLAN_NR(requestServiceId=0x27, returnCode=0x35) isotpsock2.send(nr) else: pr = GMLAN()/GMLAN_SAPR(subfunction=4) isotpsock2.send(pr) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_GetSecurityAccess(isotpsock, keyfunc, level=3, timeout=0.1) == True thread.join(timeout=5) assert res assert ecusimSuccessfullyExecuted == True = Negative scenario, invalid password ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): global ecusimSuccessfullyExecuted ecusimSuccessfullyExecuted= True # wait for request requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) pkt = GMLAN()/GMLAN_SA(subfunction=1) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0xdead) # wait for key requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(seedmsg)) pkt = GMLAN()/GMLAN_SA(subfunction=2, securityKey=0xbabe) if bytes(requ[0]) != bytes(pkt): nr = GMLAN() / GMLAN_NR(requestServiceId=0x27, returnCode=0x35) isotpsock2.send(nr) else: ecusimSuccessfullyExecuted = False pr = GMLAN()/GMLAN_SAPR(subfunction=2) isotpsock2.send(pr) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_GetSecurityAccess(isotpsock, keyfunc, level=1, timeout=0.1) == False thread.join(timeout=5) assert res assert ecusimSuccessfullyExecuted == True = invalid level (not an odd number) assert GMLAN_GetSecurityAccess(isotpsock, keyfunc, level=2, timeout=1) == False = zero seed started = threading.Event() def ecusim(): # wait for request isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0x0000) isotpsock2.send(seedmsg) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_GetSecurityAccess(isotpsock, keyfunc, level=1, timeout=0.1) == True thread.join(timeout=5) assert res ############### retry = Positive scenario, request timeout, retry works started = threading.Event() def ecusim(): # timeout requ = isotpsock2.sniff(count=1, timeout=0.1, started_callback=started.set) # wait for request requ = isotpsock2.sniff(count=1, timeout=3) seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0xdead) # wait for key requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(seedmsg)) pkt = GMLAN()/GMLAN_SA(subfunction=2, securityKey=0xbeef) pr = GMLAN()/GMLAN_SAPR(subfunction=2) isotpsock2.send(pr) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_GetSecurityAccess(isotpsock, keyfunc, level=1, timeout=0.1, retry=1) == True thread.join(timeout=5) assert res = Positive scenario, keysend timeout, retry works started = threading.Event() def ecusim(): # wait for request requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0xdead) # timeout requ = isotpsock2.sniff(count=1, timeout=0.1, started_callback=lambda:isotpsock2.send(seedmsg)) # retry from start requ = isotpsock2.sniff(count=1, timeout=3) seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0xdead) # wait for key requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(seedmsg)) pr = GMLAN()/GMLAN_SAPR(subfunction=2) isotpsock2.send(pr) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_GetSecurityAccess(isotpsock, keyfunc, level=1, timeout=0.1, retry=1) == True thread.join(timeout=5) assert res = Positive scenario, request error, retry works started = threading.Event() def ecusim(): # wait for request requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) nr = GMLAN() / GMLAN_NR(requestServiceId=0x27, returnCode=0x37) # wait for request requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(nr)) seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0xdead) # wait for key requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(seedmsg)) pkt = GMLAN()/GMLAN_SA(subfunction=2, securityKey=0xbeef) pr = GMLAN()/GMLAN_SAPR(subfunction=2) isotpsock2.send(pr) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_GetSecurityAccess(isotpsock, keyfunc, level=1, timeout=0.1, retry=1) == True thread.join(timeout=5) assert res ############################################################################## + GMLAN_InitDiagnostics Tests ############################################################################## = sequence of the correct messages ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): global ecusimSuccessfullyExecuted ecusimSuccessfullyExecuted= True # DisableNormalCommunication requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) pkt = GMLAN(b"\x28") if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = b"\x68" # ReportProgrammedState requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) pkt = GMLAN(b"\xa2") if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = GMLAN()/GMLAN_RPSPR(programmedState=0) # ProgrammingMode requestProgramming requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) pkt = GMLAN() / GMLAN_PM(subfunction=0x1) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = GMLAN(b"\xe5") # InitiateProgramming enableProgramming requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) pkt = GMLAN() / GMLAN_PM(subfunction=0x3) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_InitDiagnostics(isotpsock, timeout=0.1, unittest=True) == True thread.join(timeout=5) assert res assert ecusimSuccessfullyExecuted == True = sequence of the correct messages, disablenormalcommunication as broadcast ecusimSuccessfullyExecuted = True started = threading.Event() broadcastsender = TestSocket(CAN) broadcastrcv = TestSocket(CAN) broadcastsender.pair(broadcastrcv) def ecusim(): global ecusimSuccessfullyExecuted ecusimSuccessfullyExecuted= True print("DisableNormalCommunication") requ = broadcastrcv.sniff(count=1, timeout=2, started_callback=started.set) assert len(requ) >= 1 if bytes(requ[0].data)[0:3] != b"\xfe\x01\x28": ecusimSuccessfullyExecuted = False print("ReportProgrammedState") requ = isotpsock2.sniff(count=1, timeout=2) pkt = GMLAN(b"\xa2") if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = GMLAN()/GMLAN_RPSPR(programmedState=0) print("ProgrammingMode requestProgramming") requ = isotpsock2.sniff(count=1, timeout=3, started_callback=lambda: isotpsock2.send(ack)) pkt = GMLAN() / GMLAN_PM(subfunction=0x1) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = GMLAN(b"\xe5") print("InitiateProgramming enableProgramming") requ = isotpsock2.sniff(count=1, timeout=3, started_callback=lambda: isotpsock2.send(ack)) pkt = GMLAN() / GMLAN_PM(subfunction=0x3) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_InitDiagnostics(isotpsock, broadcast_socket=GMLAN_BroadcastSocket(broadcastsender), timeout=5, unittest=True) == True thread.join(timeout=5) assert res assert ecusimSuccessfullyExecuted == True ######## timeout = timeout DisableNormalCommunication ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): global ecusimSuccessfullyExecuted ecusimSuccessfullyExecuted= True # DisableNormalCommunication requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) pkt = GMLAN(b"\x28") if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_InitDiagnostics(isotpsock, timeout=0.1, unittest=True) == False thread.join(timeout=5) assert res assert ecusimSuccessfullyExecuted == True = timeout ReportProgrammedState ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): global ecusimSuccessfullyExecuted ecusimSuccessfullyExecuted= True # DisableNormalCommunication requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) pkt = GMLAN(b"\x28") if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = b"\x68" # ReportProgrammedState requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) pkt = GMLAN(b"\xa2") if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = GMLAN()/GMLAN_RPSPR(programmedState=0) isotpsock2.send(ack) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_InitDiagnostics(isotpsock, timeout=0.1, unittest=True) == False thread.join(timeout=5) assert res assert ecusimSuccessfullyExecuted == True = timeout ProgrammingMode requestProgramming ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): global ecusimSuccessfullyExecuted ecusimSuccessfullyExecuted= True # DisableNormalCommunication requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) pkt = GMLAN(b"\x28") if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = b"\x68" # ReportProgrammedState requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) pkt = GMLAN(b"\xa2") if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = GMLAN()/GMLAN_RPSPR(programmedState=0) # ProgrammingMode requestProgramming requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) pkt = GMLAN() / GMLAN_PM(subfunction=0x1) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_InitDiagnostics(isotpsock, timeout=0.1, unittest=True) == False thread.join(timeout=5) assert res assert ecusimSuccessfullyExecuted == True ###### negative response = timeout DisableNormalCommunication ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): global ecusimSuccessfullyExecuted ecusimSuccessfullyExecuted= True # DisableNormalCommunication requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) pkt = GMLAN(b"\x28") if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = GMLAN() / GMLAN_NR(requestServiceId=0x28, returnCode=0x12) isotpsock2.send(ack) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_InitDiagnostics(isotpsock, timeout=0.1, unittest=True) == False thread.join(timeout=5) assert res assert ecusimSuccessfullyExecuted == True ###### retry tests = sequence of the correct messages, retry set started = threading.Event() def ecusim(): # DisableNormalCommunication requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) ack = b"\x68" # ReportProgrammedState requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = GMLAN()/GMLAN_RPSPR(programmedState=0) # ProgrammingMode requestProgramming requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = GMLAN(b"\xe5") # InitiateProgramming enableProgramming requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_InitDiagnostics(isotpsock, timeout=1, retry=0, unittest=True) == True assert res thread.join(timeout=5) = negative response, make sure no retries are made ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): global ecusimSuccessfullyExecuted ecusimSuccessfullyExecuted= True requ = isotpsock2.sniff(count=1, timeout=0.1, started_callback=started.set) ack = GMLAN() / GMLAN_NR(requestServiceId=0x28, returnCode=0x12) requ = isotpsock2.sniff(count=1, timeout=0.1, started_callback=lambda:isotpsock2.send(ack)) if len(requ) != 0: ecusimSuccessfullyExecuted = False thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_InitDiagnostics(isotpsock, timeout=0.1, retry=0, unittest=True) == False thread.join(timeout=5) assert res assert ecusimSuccessfullyExecuted == True = first fail at DisableNormalCommunication, then sequence of the correct messages started = threading.Event() def ecusim(): requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) ack = GMLAN() / GMLAN_NR(requestServiceId=0x28, returnCode=0x12) # DisableNormalCommunication requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = b"\x68" # ReportProgrammedState requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = GMLAN()/GMLAN_RPSPR(programmedState=0) # ProgrammingMode requestProgramming requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = GMLAN(b"\xe5") # InitiateProgramming enableProgramming requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_InitDiagnostics(isotpsock, timeout=0.1, retry=1, unittest=True) == True thread.join(timeout=5) assert res = first fail at ReportProgrammedState, then sequence of the correct messages started = threading.Event() def ecusim(): # DisableNormalCommunication requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) ack = b"\x68" # Fail requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = GMLAN() / GMLAN_NR(requestServiceId=0xA2, returnCode=0x12) # DisableNormalCommunication requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = b"\x68" # ReportProgrammedState requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = GMLAN()/GMLAN_RPSPR(programmedState=0) # ProgrammingMode requestProgramming requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = GMLAN(b"\xe5") # InitiateProgramming enableProgramming requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_InitDiagnostics(isotpsock, timeout=1, retry=1, unittest=True) == True thread.join(timeout=5) assert res = first fail at ProgrammingMode requestProgramming, then sequence of the correct messages started = threading.Event() def ecusim(): # DisableNormalCommunication requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) ack = b"\x68" # ReportProgrammedState requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = GMLAN()/GMLAN_RPSPR(programmedState=0) # Fail requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = GMLAN() / GMLAN_NR(requestServiceId=0xA5, returnCode=0x12) # DisableNormalCommunication requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = b"\x68" # ReportProgrammedState requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = GMLAN()/GMLAN_RPSPR(programmedState=0) # ProgrammingMode requestProgramming requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = GMLAN(b"\xe5") isotpsock2.send(ack) # InitiateProgramming enableProgramming requ = isotpsock2.sniff(count=1, timeout=1) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_InitDiagnostics(isotpsock, timeout=1, retry=1, unittest=True) == True thread.join(timeout=5) assert res = fail twice ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): global ecusimSuccessfullyExecuted ecusimSuccessfullyExecuted= True requ = isotpsock2.sniff(count=1, timeout=0.1, started_callback=started.set) ack = GMLAN() / GMLAN_NR(requestServiceId=0x28, returnCode=0x12) requ = isotpsock2.sniff(count=1, timeout=0.1, started_callback=lambda:isotpsock2.send(ack)) ack = GMLAN() / GMLAN_NR(requestServiceId=0x28, returnCode=0x12) requ = isotpsock2.sniff(count=1, timeout=0.1, started_callback=lambda:isotpsock2.send(ack)) if len(requ) != 0: ecusimSuccessfullyExecuted = False thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_InitDiagnostics(isotpsock, timeout=0.1, retry=1, unittest=True) == False thread.join(timeout=5) assert res assert ecusimSuccessfullyExecuted == True ############################################################################## + GMLAN_ReadMemoryByAddress Tests ############################################################################## = Positive, short length, scheme = 4 conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): global ecusimSuccessfullyExecuted ecusimSuccessfullyExecuted= True requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) pkt = GMLAN() / GMLAN_RMBA(memoryAddress=0x0, memorySize=0x8) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = GMLAN() / GMLAN_RMBAPR(memoryAddress=0x0, dataRecord=payload) isotpsock2.send(ack) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_ReadMemoryByAddress(isotpsock, 0x0, 0x8, timeout=1) == payload thread.join(timeout=5) assert res assert ecusimSuccessfullyExecuted == True = Negative, negative response conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" started = threading.Event() def ecusim(): requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) ack = GMLAN() / GMLAN_NR(requestServiceId=0x23, returnCode=0x31) isotpsock2.send(ack) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_ReadMemoryByAddress(isotpsock, 0x0, 0x8, timeout=1) is None thread.join(timeout=5) assert res = Negative, timeout assert GMLAN_ReadMemoryByAddress(isotpsock, 0x0, 0x8, timeout=0.01) is None ###### RETRY = Positive, negative response, retry succeeds conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" started = threading.Event() def ecusim(): requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) ack = GMLAN() / GMLAN_NR(requestServiceId=0x23, returnCode=0x31) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = GMLAN() / GMLAN_RMBAPR(memoryAddress=0x0, dataRecord=payload) isotpsock2.send(ack) thread = threading.Thread(target=ecusim) sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) thread.start() started.wait(timeout=5) res = GMLAN_ReadMemoryByAddress(isotpsock, 0x0, 0x8, timeout=1, retry=1) == payload thread.join(timeout=5) assert res + Cleanup = Delete TestSockets cleanup_testsockets()