1% Regression tests for obdscanner 2~ vcan_socket needs_root linux not_pypy automotive_comm scanner 3 4+ Configuration 5~ conf 6 7= Imports 8with open(scapy_path("test/contrib/automotive/interface_mockup.py")) as f: 9 exec(f.read()) 10 11load_contrib("automotive.ecu", globals_dict=globals()) 12 13+ Usage tests 14 15= Test wrong usage 16print(sys.executable) 17result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) 18returncode = result.wait() 19std_out, std_err = result.communicate() 20assert returncode != 0 21 22expected_output = plain_str(b'usage:') 23assert expected_output in plain_str(std_err) 24 25 26= Test show help 27result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py", "--help"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) 28assert result.wait() == 0 29std_out, std_err = result.communicate() 30expected_output = plain_str(b'Scan for all possible obd service classes and their subfunctions.') 31assert expected_output in plain_str(std_out) 32 33+ Scan tests 34 35= Load contribution layer 36 37from scapy.contrib.automotive.obd.obd import * 38 39+ Simulate scanner 40 41= Test DTC scan 42 43drain_bus(iface0) 44 45s3 = OBD()/OBD_S03_PR(dtcs=[OBD_DTC()]) 46 47example_responses = [EcuResponse(responses=s3)] 48 49with new_can_socket0() as isocan, ISOTPSocket(isocan, 0x7e8, 0x7e0, basecls=OBD, padding=True) as ecu, \ 50 new_can_socket0() as isocan2, ISOTPSocket(isocan2, 0x7e0, 0x7e8, basecls=OBD, padding=True) as tester: 51 conf.verb = -1 52 answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=OBD, verbose=False) 53 sim = threading.Thread(target=answering_machine, kwargs={'verbose': False, 'timeout': 15, 'stop_filter': lambda p: bytes(p) == b"\x01\xff\xff\xff\xff"}) 54 sim.start() 55 try: 56 result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) 57 std_out1, std_err1 = result.communicate() 58 result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) 59 std_out2, std_err2 = result.communicate() 60 except Exception as e: 61 print(e) 62 finally: 63 tester.send(b"\x01\xff\xff\xff\xff") 64 sim.join(timeout=10) 65 expected_output = b"1 requests were sent, 1 answered" 66 assert bytes_encode(expected_output) in bytes_encode(std_out1) or bytes_encode(expected_output) in bytes_encode(std_out2) 67 68= Test supported PIDs scan 69 70drain_bus(iface0) 71 72s1_pid00 = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID00(supported_pids="PID03+PID0B+PID0F")]) 73s6_mid00 = OBD()/OBD_S06_PR(data_records=[OBD_S06_PR_Record()/OBD_MID00(supported_mids="")]) 74s8_tid00 = OBD()/OBD_S08_PR(data_records=[OBD_S08_PR_Record()/OBD_TID00(supported_tids="")]) 75s9_iid00 = OBD()/OBD_S09_PR(data_records=[OBD_S09_PR_Record()/OBD_IID00(supported_iids="")]) 76 77s1_pid03 = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID03(fuel_system1=0, fuel_system2=2)]) 78s1_pid0B = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID0B(data=100)]) 79s1_pid0F = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID0F(data=50)]) 80 81# Create answers for 'supported PIDs scan' 82example_responses = \ 83 [EcuResponse(responses=s3), 84 EcuResponse(responses=s1_pid00), 85 EcuResponse(responses=s6_mid00), 86 EcuResponse(responses=s8_tid00), 87 EcuResponse(responses=s9_iid00), 88 EcuResponse(responses=s1_pid03), 89 EcuResponse(responses=s1_pid0B), 90 EcuResponse(responses=s1_pid0F)] 91 92 93 94with new_can_socket0() as isocan, ISOTPSocket(isocan, 0x7e8, 0x7e0, basecls=OBD, padding=True) as ecu, \ 95 new_can_socket0() as isocan2, ISOTPSocket(isocan2, 0x7e0, 0x7e8, basecls=OBD, padding=True) as tester: 96 answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=OBD, verbose=False) 97 sim = threading.Thread(target=answering_machine, kwargs={'verbose': False, 'timeout': 100, 'stop_filter': lambda p: bytes(p) == b"\x01\xff\xff\xff\xff"}) 98 sim.start() 99 try: 100 result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) 101 std_out1, std_err1 = result.communicate() 102 print(std_out2, std_err2) 103 result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) 104 std_out2, std_err2 = result.communicate() 105 print(std_out2, std_err2) 106 except Exception as e: 107 print(e) 108 finally: 109 tester.send(b"\x01\xff\xff\xff\xff") 110 sim.join(timeout=10) 111 expected_output = ["supported_pids=PID0F+PID0B+PID03", "fuel_system1=OpenLoopInsufficientEngineTemperature fuel_system2=ClosedLoop", "data=100 kPa", "data=50.0 deg. C"] 112 for out in expected_output: 113 assert bytes_encode(out) in bytes_encode(std_out1) or bytes_encode(out) in bytes_encode(std_out2) 114 115= Test only Service 01 PIDs scan 116 117drain_bus(iface0) 118 119s1_pid00 = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID00(supported_pids="PID03+PID0B+PID0F")]) 120s6_mid00 = OBD()/OBD_S06_PR(data_records=[OBD_S06_PR_Record()/OBD_MID00(supported_mids="")]) 121s8_tid00 = OBD()/OBD_S08_PR(data_records=[OBD_S08_PR_Record()/OBD_TID00(supported_tids="")]) 122s9_iid00 = OBD()/OBD_S09_PR(data_records=[OBD_S09_PR_Record()/OBD_IID00(supported_iids="")]) 123 124s1_pid03 = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID03(fuel_system1=0, fuel_system2=2)]) 125s1_pid0B = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID0B(data=100)]) 126s1_pid0F = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID0F(data=50)]) 127 128# Create answers for 'supported PIDs scan' 129example_responses = \ 130 [EcuResponse(responses=s3), 131 EcuResponse(responses=s1_pid00), 132 EcuResponse(responses=s6_mid00), 133 EcuResponse(responses=s8_tid00), 134 EcuResponse(responses=s9_iid00), 135 EcuResponse(responses=s1_pid03), 136 EcuResponse(responses=s1_pid0B), 137 EcuResponse(responses=s1_pid0F)] 138 139 140 141with new_can_socket0() as isocan, ISOTPSocket(isocan, 0x7e8, 0x7e0, basecls=OBD, padding=True) as ecu, \ 142 new_can_socket0() as isocan2, ISOTPSocket(isocan2, 0x7e0, 0x7e8, basecls=OBD, padding=True) as tester: 143 answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=OBD, verbose=False) 144 sim = threading.Thread(target=answering_machine, kwargs={'verbose': False, 'timeout': 100, 'stop_filter': lambda p: bytes(p) == b"\x01\xff\xff\xff\xff"}) 145 sim.start() 146 try: 147 result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30", "-1"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) 148 std_out1, std_err1 = result.communicate() 149 print(std_out1, std_err1) 150 result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30", "-1"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) 151 std_out2, std_err2 = result.communicate() 152 print(std_out2, std_err2) 153 except Exception as e: 154 print(e) 155 finally: 156 tester.send(b"\x01\xff\xff\xff\xff") 157 sim.join(timeout=10) 158 expected_output = ["supported_pids=PID0F+PID0B+PID03", "fuel_system1=OpenLoopInsufficientEngineTemperature fuel_system2=ClosedLoop", "data=100 kPa", "data=50.0 deg. C"] 159 for out in expected_output: 160 assert bytes_encode(out) in bytes_encode(std_out1) or bytes_encode(out) in bytes_encode(std_out2) 161 162 163= Test full scan 164 165drain_bus(iface0) 166 167# Add unsupported PID 168s1_pid01 = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID01()]) 169example_responses.append(EcuResponse(responses=s1_pid01)) 170 171with new_can_socket0() as isocan, ISOTPSocket(isocan, 0x7e8, 0x7e0, basecls=OBD, padding=True) as ecu, \ 172 new_can_socket0() as isocan2, ISOTPSocket(isocan2, 0x7e0, 0x7e8, basecls=OBD, padding=True) as tester: 173 answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=OBD, verbose=False) 174 sim = threading.Thread(target=answering_machine, kwargs={'verbose': False, 'timeout': 100, 'stop_filter': lambda p: bytes(p) == b"\x01\xff\xff\xff\xff"}) 175 sim.start() 176 try: 177 result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30", "-f", "-1", "-3"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) 178 std_out1, std_err1 = result.communicate() 179 result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30", "-f", "-1", "-3"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) 180 std_out2, std_err2 = result.communicate() 181 except Exception as e: 182 print(e) 183 finally: 184 tester.send(b"\x01\xff\xff\xff\xff") 185 sim.join(timeout=10) 186 expected_output = ["256 requests were sent", "1 requests were sent, 1 answered"] 187 for out in expected_output: 188 assert bytes_encode(out) in bytes_encode(std_out1) or bytes_encode(out) in bytes_encode(std_out2) 189 190+ Cleanup 191 192= Delete vcan interfaces 193 194assert cleanup_interfaces() 195