% Regression tests for obdscanner ~ vcan_socket needs_root linux not_pypy automotive_comm scanner + Configuration ~ conf = Imports with open(scapy_path("test/contrib/automotive/interface_mockup.py")) as f: exec(f.read()) load_contrib("automotive.ecu", globals_dict=globals()) + Usage tests = Test wrong usage print(sys.executable) result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) returncode = result.wait() std_out, std_err = result.communicate() assert returncode != 0 expected_output = plain_str(b'usage:') assert expected_output in plain_str(std_err) = Test show help result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py", "--help"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) assert result.wait() == 0 std_out, std_err = result.communicate() expected_output = plain_str(b'Scan for all possible obd service classes and their subfunctions.') assert expected_output in plain_str(std_out) + Scan tests = Load contribution layer from scapy.contrib.automotive.obd.obd import * + Simulate scanner = Test DTC scan drain_bus(iface0) s3 = OBD()/OBD_S03_PR(dtcs=[OBD_DTC()]) example_responses = [EcuResponse(responses=s3)] with new_can_socket0() as isocan, ISOTPSocket(isocan, 0x7e8, 0x7e0, basecls=OBD, padding=True) as ecu, \ new_can_socket0() as isocan2, ISOTPSocket(isocan2, 0x7e0, 0x7e8, basecls=OBD, padding=True) as tester: conf.verb = -1 answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=OBD, verbose=False) sim = threading.Thread(target=answering_machine, kwargs={'verbose': False, 'timeout': 15, 'stop_filter': lambda p: bytes(p) == b"\x01\xff\xff\xff\xff"}) sim.start() try: result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) std_out1, std_err1 = result.communicate() result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) std_out2, std_err2 = result.communicate() except Exception as e: print(e) finally: tester.send(b"\x01\xff\xff\xff\xff") sim.join(timeout=10) expected_output = b"1 requests were sent, 1 answered" assert bytes_encode(expected_output) in bytes_encode(std_out1) or bytes_encode(expected_output) in bytes_encode(std_out2) = Test supported PIDs scan drain_bus(iface0) s1_pid00 = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID00(supported_pids="PID03+PID0B+PID0F")]) s6_mid00 = OBD()/OBD_S06_PR(data_records=[OBD_S06_PR_Record()/OBD_MID00(supported_mids="")]) s8_tid00 = OBD()/OBD_S08_PR(data_records=[OBD_S08_PR_Record()/OBD_TID00(supported_tids="")]) s9_iid00 = OBD()/OBD_S09_PR(data_records=[OBD_S09_PR_Record()/OBD_IID00(supported_iids="")]) s1_pid03 = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID03(fuel_system1=0, fuel_system2=2)]) s1_pid0B = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID0B(data=100)]) s1_pid0F = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID0F(data=50)]) # Create answers for 'supported PIDs scan' example_responses = \ [EcuResponse(responses=s3), EcuResponse(responses=s1_pid00), EcuResponse(responses=s6_mid00), EcuResponse(responses=s8_tid00), EcuResponse(responses=s9_iid00), EcuResponse(responses=s1_pid03), EcuResponse(responses=s1_pid0B), EcuResponse(responses=s1_pid0F)] with new_can_socket0() as isocan, ISOTPSocket(isocan, 0x7e8, 0x7e0, basecls=OBD, padding=True) as ecu, \ new_can_socket0() as isocan2, ISOTPSocket(isocan2, 0x7e0, 0x7e8, basecls=OBD, padding=True) as tester: answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=OBD, verbose=False) sim = threading.Thread(target=answering_machine, kwargs={'verbose': False, 'timeout': 100, 'stop_filter': lambda p: bytes(p) == b"\x01\xff\xff\xff\xff"}) sim.start() try: result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) std_out1, std_err1 = result.communicate() print(std_out2, std_err2) result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) std_out2, std_err2 = result.communicate() print(std_out2, std_err2) except Exception as e: print(e) finally: tester.send(b"\x01\xff\xff\xff\xff") sim.join(timeout=10) expected_output = ["supported_pids=PID0F+PID0B+PID03", "fuel_system1=OpenLoopInsufficientEngineTemperature fuel_system2=ClosedLoop", "data=100 kPa", "data=50.0 deg. C"] for out in expected_output: assert bytes_encode(out) in bytes_encode(std_out1) or bytes_encode(out) in bytes_encode(std_out2) = Test only Service 01 PIDs scan drain_bus(iface0) s1_pid00 = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID00(supported_pids="PID03+PID0B+PID0F")]) s6_mid00 = OBD()/OBD_S06_PR(data_records=[OBD_S06_PR_Record()/OBD_MID00(supported_mids="")]) s8_tid00 = OBD()/OBD_S08_PR(data_records=[OBD_S08_PR_Record()/OBD_TID00(supported_tids="")]) s9_iid00 = OBD()/OBD_S09_PR(data_records=[OBD_S09_PR_Record()/OBD_IID00(supported_iids="")]) s1_pid03 = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID03(fuel_system1=0, fuel_system2=2)]) s1_pid0B = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID0B(data=100)]) s1_pid0F = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID0F(data=50)]) # Create answers for 'supported PIDs scan' example_responses = \ [EcuResponse(responses=s3), EcuResponse(responses=s1_pid00), EcuResponse(responses=s6_mid00), EcuResponse(responses=s8_tid00), EcuResponse(responses=s9_iid00), EcuResponse(responses=s1_pid03), EcuResponse(responses=s1_pid0B), EcuResponse(responses=s1_pid0F)] with new_can_socket0() as isocan, ISOTPSocket(isocan, 0x7e8, 0x7e0, basecls=OBD, padding=True) as ecu, \ new_can_socket0() as isocan2, ISOTPSocket(isocan2, 0x7e0, 0x7e8, basecls=OBD, padding=True) as tester: answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=OBD, verbose=False) sim = threading.Thread(target=answering_machine, kwargs={'verbose': False, 'timeout': 100, 'stop_filter': lambda p: bytes(p) == b"\x01\xff\xff\xff\xff"}) sim.start() try: result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30", "-1"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) std_out1, std_err1 = result.communicate() print(std_out1, std_err1) result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30", "-1"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) std_out2, std_err2 = result.communicate() print(std_out2, std_err2) except Exception as e: print(e) finally: tester.send(b"\x01\xff\xff\xff\xff") sim.join(timeout=10) expected_output = ["supported_pids=PID0F+PID0B+PID03", "fuel_system1=OpenLoopInsufficientEngineTemperature fuel_system2=ClosedLoop", "data=100 kPa", "data=50.0 deg. C"] for out in expected_output: assert bytes_encode(out) in bytes_encode(std_out1) or bytes_encode(out) in bytes_encode(std_out2) = Test full scan drain_bus(iface0) # Add unsupported PID s1_pid01 = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID01()]) example_responses.append(EcuResponse(responses=s1_pid01)) with new_can_socket0() as isocan, ISOTPSocket(isocan, 0x7e8, 0x7e0, basecls=OBD, padding=True) as ecu, \ new_can_socket0() as isocan2, ISOTPSocket(isocan2, 0x7e0, 0x7e8, basecls=OBD, padding=True) as tester: answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=OBD, verbose=False) sim = threading.Thread(target=answering_machine, kwargs={'verbose': False, 'timeout': 100, 'stop_filter': lambda p: bytes(p) == b"\x01\xff\xff\xff\xff"}) sim.start() try: result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30", "-f", "-1", "-3"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) std_out1, std_err1 = result.communicate() result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30", "-f", "-1", "-3"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) std_out2, std_err2 = result.communicate() except Exception as e: print(e) finally: tester.send(b"\x01\xff\xff\xff\xff") sim.join(timeout=10) expected_output = ["256 requests were sent", "1 requests were sent, 1 answered"] for out in expected_output: assert bytes_encode(out) in bytes_encode(std_out1) or bytes_encode(out) in bytes_encode(std_out2) + Cleanup = Delete vcan interfaces assert cleanup_interfaces()