% Regression tests for EcuAnsweringMachine + Configuration ~ conf = Imports from test.testsocket import TestSocket, cleanup_testsockets ############ ############ + Load general modules = Load contribution layer load_contrib("automotive.uds", globals_dict=globals()) load_contrib("automotive.ecu", globals_dict=globals()) load_contrib("automotive.uds_ecu_states", globals_dict=globals()) ecu = TestSocket(UDS) tester = TestSocket(UDS) ecu.pair(tester) + Simulator tests = Simple check with RDBI and Negative Response example_responses = \ [EcuResponse([EcuState(session=1)], responses=UDS() / UDS_RDBIPR(dataIdentifier=0x1234) / Raw(b"deadbeef"))] success = False answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS, verbose=False) sim = threading.Thread(target=answering_machine, kwargs={'timeout': 60, 'stop_filter': lambda p: p.service==0xff}) sim.start() try: resp = tester.sr1(UDS()/UDS_RDBI(identifiers=[0x123]), timeout=1, verbose=False) assert resp.negativeResponseCode == 0x10 assert resp.requestServiceId == 34 resp = tester.sr1(UDS(service=0x22), timeout=1, verbose=False) assert resp.negativeResponseCode == 0x10 assert resp.requestServiceId == 34 resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[0x1234]), timeout=1, verbose=False) assert resp.service == 0x62 assert resp.dataIdentifier == 0x1234 assert resp.load == b"deadbeef" success = True except Exception as ex: print(ex) finally: tester.send(UDS(service=0xff)) sim.join(timeout=10) assert success = Simple check with different Sessions example_responses = \ [EcuResponse(EcuState(session=2), responses=UDS() / UDS_RDBIPR(dataIdentifier=2) / Raw(b"deadbeef1")), EcuResponse(EcuState(session=[3, 4]), responses=UDS() / UDS_RDBIPR(dataIdentifier=3) / Raw(b"deadbeef2")), EcuResponse(EcuState(session=[5, 6, 7]), responses=UDS() / UDS_RDBIPR(dataIdentifier=5) / Raw(b"deadbeef3")), EcuResponse(EcuState(session=[8, 9]), responses=UDS() / UDS_RDBIPR(dataIdentifier=9) / Raw(b"deadbeef4"))] success = False answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS) sim = threading.Thread(target=answering_machine, kwargs={'timeout': 60, 'stop_filter': lambda p: p.service==0xff}) sim.start() try: resp = tester.sr1(UDS()/UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) assert resp.negativeResponseCode == 0x10 assert resp.requestServiceId == 34 answering_machine.state.session = 2 resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) assert resp.service == 0x62 assert resp.dataIdentifier == 2 assert resp.load == b"deadbeef1" answering_machine.state.session = 4 resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) assert resp.service == 0x62 assert resp.dataIdentifier == 3 assert resp.load == b"deadbeef2" answering_machine.state.session = 6 resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) assert resp.service == 0x62 assert resp.dataIdentifier == 5 assert resp.load == b"deadbeef3" answering_machine.state.session = 9 resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) assert resp.service == 0x62 assert resp.dataIdentifier == 9 assert resp.load == b"deadbeef4" success = True except Exception as ex: print(ex) finally: tester.send(UDS(service=0xff)) sim.join(timeout=10) assert success = Simple check with different Sessions and diagnosticSessionControl example_responses = \ [EcuResponse(EcuState(session=2), responses=UDS() / UDS_RDBIPR(dataIdentifier=2) / Raw(b"deadbeef1")), EcuResponse(EcuState(session=range(3,5)), responses=UDS() / UDS_RDBIPR(dataIdentifier=3) / Raw(b"deadbeef2")), EcuResponse(EcuState(session=[5,6,7]), responses=UDS() / UDS_RDBIPR(dataIdentifier=5) / Raw(b"deadbeef3")), EcuResponse(EcuState(session=9), responses=UDS() / UDS_RDBIPR(dataIdentifier=9) / Raw(b"deadbeef4")), EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=1, sessionParameterRecord=b"dead")), EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=2, sessionParameterRecord=b"dead")), EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=3, sessionParameterRecord=b"dead")), EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=4, sessionParameterRecord=b"dead")), EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=5, sessionParameterRecord=b"dead")), EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=6, sessionParameterRecord=b"dead")), EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=7, sessionParameterRecord=b"dead")), EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=8, sessionParameterRecord=b"dead")), EcuResponse([EcuState(), EcuState(session=range(8,10))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=9, sessionParameterRecord=b"dead1")), EcuResponse([EcuState(), EcuState(session=range(8,10))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=9, sessionParameterRecord=b"dead2")), EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))] success = False answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS) sim = threading.Thread(target=answering_machine, kwargs={'timeout': 60, 'stop_filter': lambda p: p.service==0xff}) sim.start() try: resp = tester.sr1(UDS()/UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) assert resp.negativeResponseCode == 0x10 assert resp.requestServiceId == 34 resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=2), timeout=1, verbose=False) assert resp.service == 0x50 assert resp.diagnosticSessionType == 2 assert resp.sessionParameterRecord == b"dead" resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) assert resp.service == 0x62 assert resp.dataIdentifier == 2 assert resp.load == b"deadbeef1" resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=4), timeout=1, verbose=False) assert resp.service == 0x50 assert resp.diagnosticSessionType == 4 assert resp.sessionParameterRecord == b"dead" resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) assert resp.service == 0x62 assert resp.dataIdentifier == 3 assert resp.load == b"deadbeef2" resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=6), timeout=1, verbose=False) assert resp.service == 0x50 assert resp.diagnosticSessionType == 6 assert resp.sessionParameterRecord == b"dead" resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) assert resp.service == 0x62 assert resp.dataIdentifier == 5 assert resp.load == b"deadbeef3" resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=8), timeout=1, verbose=False) assert resp.service == 0x50 assert resp.diagnosticSessionType == 8 assert resp.sessionParameterRecord == b"dead" resp = tester.sr1(UDS() / UDS_DSC(diagnosticSessionType=9), timeout=1, verbose=False) assert resp.service == 0x50 assert resp.diagnosticSessionType == 9 assert resp.sessionParameterRecord == b"dead1" resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) assert resp.service == 0x62 assert resp.dataIdentifier == 9 assert resp.load == b"deadbeef4" success = True except Exception as ex: print(ex) finally: tester.send(UDS(service=0xff)) sim.join(timeout=10) assert success = Simple check with different Sessions and diagnosticSessionControl and answers hook def custom_answers(resp, req): if req.service + 0x40 != resp.service: return False if hasattr(req, "diagnosticSessionType"): if 0 < req.diagnosticSessionType <= 8: resp.diagnosticSessionType = req.diagnosticSessionType return resp.answers(req) return False example_responses = \ [EcuResponse(EcuState(session=2), responses=UDS() / UDS_RDBIPR(dataIdentifier=2) / Raw(b"deadbeef1")), EcuResponse(EcuState(session=range(3,5)), responses=UDS() / UDS_RDBIPR(dataIdentifier=3) / Raw(b"deadbeef2")), EcuResponse(EcuState(session=[5,6,7]), responses=UDS() / UDS_RDBIPR(dataIdentifier=5) / Raw(b"deadbeef3")), EcuResponse(EcuState(session=[9, 10]), responses=UDS() / UDS_RDBIPR(dataIdentifier=9) / Raw(b"deadbeef4")), EcuResponse(EcuState(session=range(0,8)), responses=UDS() / UDS_DSCPR(diagnosticSessionType=1, sessionParameterRecord=b"dead"), answers=custom_answers), EcuResponse(EcuState(session=range(8,10)), responses=UDS() / UDS_DSCPR(diagnosticSessionType=9, sessionParameterRecord=b"dead1")), EcuResponse(EcuState(session=range(8,10)), responses=UDS() / UDS_DSCPR(diagnosticSessionType=9, sessionParameterRecord=b"dead2")), EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))] success = False answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS) sim = threading.Thread(target=answering_machine, kwargs={'timeout': 60, 'stop_filter': lambda p: p.service==0xff}) sim.start() try: resp = tester.sr1(UDS()/UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) assert resp.negativeResponseCode == 0x10 assert resp.requestServiceId == 34 resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=2), timeout=1, verbose=False) assert resp.service == 0x50 assert resp.diagnosticSessionType == 2 assert resp.sessionParameterRecord == b"dead" resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) assert resp.service == 0x62 assert resp.dataIdentifier == 2 assert resp.load == b"deadbeef1" resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=4), timeout=1, verbose=False) assert resp.service == 0x50 assert resp.diagnosticSessionType == 4 assert resp.sessionParameterRecord == b"dead" resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) assert resp.service == 0x62 assert resp.dataIdentifier == 3 assert resp.load == b"deadbeef2" resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=6), timeout=1, verbose=False) assert resp.service == 0x50 assert resp.diagnosticSessionType == 6 assert resp.sessionParameterRecord == b"dead" resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) assert resp.service == 0x62 assert resp.dataIdentifier == 5 assert resp.load == b"deadbeef3" resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=8), timeout=1, verbose=False) assert resp.service == 0x50 assert resp.diagnosticSessionType == 8 assert resp.sessionParameterRecord == b"dead" resp = tester.sr1(UDS() / UDS_DSC(diagnosticSessionType=9), timeout=1, verbose=False) assert resp.service == 0x50 assert resp.diagnosticSessionType == 9 assert resp.sessionParameterRecord == b"dead1" resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) assert resp.service == 0x62 assert resp.dataIdentifier == 9 assert resp.load == b"deadbeef4" success = True except Exception as ex: print(ex) finally: tester.send(UDS(service=0xff)) sim.join(timeout=10) assert success = Simple check with security access and answers hook security_seed = b"abcd" def custom_answers(resp, req): global security_seed if req.service + 0x40 != resp.service or req.service != 0x27: return False if req.securityAccessType == 1: resp.securitySeed = security_seed return resp.answers(req) elif req.securityAccessType == 2: return resp.answers(req) and req.securityKey == security_seed + security_seed return False example_responses = \ [EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_SAPR(securityAccessType=1, securitySeed=b"1234"), answers=custom_answers), EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_SAPR(securityAccessType=2), answers=custom_answers), EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x35, requestServiceId=0x27)), EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))] success = False answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS) sim = threading.Thread(target=answering_machine, kwargs={'timeout': 10, 'stop_filter': lambda p: p.service==0xff}) sim.start() try: resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=1, verbose=False) assert resp.service == 0x67 assert resp.securitySeed == b"abcd" resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed), timeout=1, verbose=False) assert resp.service == 0x7f assert resp.negativeResponseCode == 0x35 resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=1, verbose=False) assert resp.service == 0x67 assert resp.securitySeed == b"abcd" resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed+resp.securitySeed), timeout=1, verbose=False) assert resp.service == 0x67 success = True except Exception as ex: print(ex) finally: tester.send(UDS(service=0xff)) sim.join(timeout=10) assert success = Simple check with security access and answers hook and request-correctly-received message security_seed = b"abcd" def custom_answers(resp, req): global security_seed if req.service + 0x40 != resp.service or req.service != 0x27: return False if req.securityAccessType == 1: resp.securitySeed = security_seed return resp.answers(req) elif req.securityAccessType == 2: return resp.answers(req) and req.securityKey == security_seed + security_seed return False example_responses = \ [EcuResponse(EcuState(session=range(0,255)), responses=[UDS()/UDS_NR(negativeResponseCode=0x78, requestServiceId=0x27), UDS() / UDS_SAPR(securityAccessType=1, securitySeed=b"1234")], answers=custom_answers), EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_SAPR(securityAccessType=2), answers=custom_answers), EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x35, requestServiceId=0x27)), EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))] success = False answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS) sim = threading.Thread(target=answering_machine, kwargs={'timeout': 10, 'stop_filter': lambda p: p.service==0xff}) sim.start() try: resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=2, verbose=False) assert resp.service == 0x67 assert resp.securitySeed == b"abcd" resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed), timeout=2, verbose=False) assert resp.service == 0x7f assert resp.negativeResponseCode == 0x35 resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=2, verbose=False) assert resp.service == 0x67 assert resp.securitySeed == b"abcd" resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed+resp.securitySeed), timeout=2, verbose=False) assert resp.service == 0x67 success = True except Exception as ex: print(ex) finally: tester.send(UDS(service=0xff)) sim.join(timeout=10) assert success = Simple check with security access and answers hook and request-correctly-received message 2 security_seed = b"abcd" def custom_answers(resp, req): global security_seed if req.service + 0x40 != resp.service or req.service != 0x27: return False if req.securityAccessType == 1: resp.securitySeed = security_seed return resp.answers(req) elif req.securityAccessType == 2: return resp.answers(req) and req.securityKey == security_seed + security_seed return False example_responses = \ [EcuResponse(EcuState(session=range(0,255)), responses=[UDS()/UDS_NR(negativeResponseCode=0x78, requestServiceId=0x27), UDS() / UDS_SAPR(securityAccessType=1, securitySeed=b"1234")], answers=custom_answers), EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_SAPR(securityAccessType=2), answers=custom_answers), EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x35, requestServiceId=0x27)), EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))] conf.contribs['UDS']['treat-response-pending-as-answer'] = True success = False answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS) sim = threading.Thread(target=answering_machine, kwargs={'timeout':5, 'stop_filter': lambda p: p.service==0xff}) sim.start() try: resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=1, verbose=False) assert resp.service == 0x7f assert resp.negativeResponseCode == 0x78 resp = tester.sniff(timeout=2, count=1, verbose=False)[0] assert resp.service == 0x67 assert resp.securitySeed == b"abcd" resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed), timeout=3, verbose=False) assert resp.service == 0x7f assert resp.negativeResponseCode == 0x35 resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=1, verbose=False) assert resp.service == 0x7f assert resp.negativeResponseCode == 0x78 resp = tester.sniff(timeout=2, count=1, verbose=False)[0] assert resp.service == 0x67 assert resp.securitySeed == b"abcd" resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed+resp.securitySeed), timeout=1, verbose=False) assert resp.service == 0x67 success = True except Exception as ex: print(ex) finally: tester.send(UDS(service=0xff)) sim.join(timeout=10) assert success conf.contribs['UDS']['treat-response-pending-as-answer'] = False + Cleanup = Delete TestSockets cleanup_testsockets()