• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1% Regression tests for obdscanner
2~ vcan_socket needs_root linux not_pypy automotive_comm scanner
3
4+ Configuration
5~ conf
6
7= Imports
8with open(scapy_path("test/contrib/automotive/interface_mockup.py")) as f:
9    exec(f.read())
10
11load_contrib("automotive.ecu", globals_dict=globals())
12
13+ Usage tests
14
15= Test wrong usage
16print(sys.executable)
17result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
18returncode = result.wait()
19std_out, std_err = result.communicate()
20assert returncode != 0
21
22expected_output = plain_str(b'usage:')
23assert expected_output in plain_str(std_err)
24
25
26= Test show help
27result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py", "--help"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
28assert result.wait() == 0
29std_out, std_err = result.communicate()
30expected_output = plain_str(b'Scan for all possible obd service classes and their subfunctions.')
31assert expected_output in plain_str(std_out)
32
33+ Scan tests
34
35= Load contribution layer
36
37from scapy.contrib.automotive.obd.obd import *
38
39+ Simulate scanner
40
41= Test DTC scan
42
43drain_bus(iface0)
44
45s3 = OBD()/OBD_S03_PR(dtcs=[OBD_DTC()])
46
47example_responses = [EcuResponse(responses=s3)]
48
49with new_can_socket0() as isocan, ISOTPSocket(isocan, 0x7e8, 0x7e0, basecls=OBD, padding=True) as ecu, \
50        new_can_socket0() as isocan2, ISOTPSocket(isocan2, 0x7e0, 0x7e8, basecls=OBD, padding=True) as tester:
51    conf.verb = -1
52    answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=OBD, verbose=False)
53    sim = threading.Thread(target=answering_machine, kwargs={'verbose': False, 'timeout': 15, 'stop_filter': lambda p: bytes(p) == b"\x01\xff\xff\xff\xff"})
54    sim.start()
55    try:
56        result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
57        std_out1, std_err1 = result.communicate()
58        result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
59        std_out2, std_err2 = result.communicate()
60    except Exception as e:
61        print(e)
62    finally:
63        tester.send(b"\x01\xff\xff\xff\xff")
64        sim.join(timeout=10)
65    expected_output = b"1 requests were sent, 1 answered"
66    assert bytes_encode(expected_output) in bytes_encode(std_out1) or bytes_encode(expected_output) in bytes_encode(std_out2)
67
68= Test supported PIDs scan
69
70drain_bus(iface0)
71
72s1_pid00 = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID00(supported_pids="PID03+PID0B+PID0F")])
73s6_mid00 = OBD()/OBD_S06_PR(data_records=[OBD_S06_PR_Record()/OBD_MID00(supported_mids="")])
74s8_tid00 = OBD()/OBD_S08_PR(data_records=[OBD_S08_PR_Record()/OBD_TID00(supported_tids="")])
75s9_iid00 = OBD()/OBD_S09_PR(data_records=[OBD_S09_PR_Record()/OBD_IID00(supported_iids="")])
76
77s1_pid03 = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID03(fuel_system1=0, fuel_system2=2)])
78s1_pid0B = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID0B(data=100)])
79s1_pid0F = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID0F(data=50)])
80
81# Create answers for 'supported PIDs scan'
82example_responses = \
83    [EcuResponse(responses=s3),
84     EcuResponse(responses=s1_pid00),
85     EcuResponse(responses=s6_mid00),
86     EcuResponse(responses=s8_tid00),
87     EcuResponse(responses=s9_iid00),
88     EcuResponse(responses=s1_pid03),
89     EcuResponse(responses=s1_pid0B),
90     EcuResponse(responses=s1_pid0F)]
91
92
93
94with new_can_socket0() as isocan, ISOTPSocket(isocan, 0x7e8, 0x7e0, basecls=OBD, padding=True) as ecu, \
95        new_can_socket0() as isocan2, ISOTPSocket(isocan2, 0x7e0, 0x7e8, basecls=OBD, padding=True) as tester:
96    answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=OBD, verbose=False)
97    sim = threading.Thread(target=answering_machine, kwargs={'verbose': False, 'timeout': 100, 'stop_filter': lambda p: bytes(p) == b"\x01\xff\xff\xff\xff"})
98    sim.start()
99    try:
100        result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
101        std_out1, std_err1 = result.communicate()
102        print(std_out2, std_err2)
103        result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
104        std_out2, std_err2 = result.communicate()
105        print(std_out2, std_err2)
106    except Exception as e:
107        print(e)
108    finally:
109        tester.send(b"\x01\xff\xff\xff\xff")
110        sim.join(timeout=10)
111    expected_output = ["supported_pids=PID0F+PID0B+PID03", "fuel_system1=OpenLoopInsufficientEngineTemperature fuel_system2=ClosedLoop", "data=100 kPa", "data=50.0 deg. C"]
112    for out in expected_output:
113        assert bytes_encode(out) in bytes_encode(std_out1) or bytes_encode(out) in bytes_encode(std_out2)
114
115= Test only Service 01 PIDs scan
116
117drain_bus(iface0)
118
119s1_pid00 = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID00(supported_pids="PID03+PID0B+PID0F")])
120s6_mid00 = OBD()/OBD_S06_PR(data_records=[OBD_S06_PR_Record()/OBD_MID00(supported_mids="")])
121s8_tid00 = OBD()/OBD_S08_PR(data_records=[OBD_S08_PR_Record()/OBD_TID00(supported_tids="")])
122s9_iid00 = OBD()/OBD_S09_PR(data_records=[OBD_S09_PR_Record()/OBD_IID00(supported_iids="")])
123
124s1_pid03 = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID03(fuel_system1=0, fuel_system2=2)])
125s1_pid0B = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID0B(data=100)])
126s1_pid0F = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID0F(data=50)])
127
128# Create answers for 'supported PIDs scan'
129example_responses = \
130    [EcuResponse(responses=s3),
131     EcuResponse(responses=s1_pid00),
132     EcuResponse(responses=s6_mid00),
133     EcuResponse(responses=s8_tid00),
134     EcuResponse(responses=s9_iid00),
135     EcuResponse(responses=s1_pid03),
136     EcuResponse(responses=s1_pid0B),
137     EcuResponse(responses=s1_pid0F)]
138
139
140
141with new_can_socket0() as isocan, ISOTPSocket(isocan, 0x7e8, 0x7e0, basecls=OBD, padding=True) as ecu, \
142        new_can_socket0() as isocan2, ISOTPSocket(isocan2, 0x7e0, 0x7e8, basecls=OBD, padding=True) as tester:
143    answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=OBD, verbose=False)
144    sim = threading.Thread(target=answering_machine, kwargs={'verbose': False, 'timeout': 100, 'stop_filter':  lambda p: bytes(p) == b"\x01\xff\xff\xff\xff"})
145    sim.start()
146    try:
147        result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30", "-1"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
148        std_out1, std_err1 = result.communicate()
149        print(std_out1, std_err1)
150        result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30", "-1"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
151        std_out2, std_err2 = result.communicate()
152        print(std_out2, std_err2)
153    except Exception as e:
154        print(e)
155    finally:
156        tester.send(b"\x01\xff\xff\xff\xff")
157        sim.join(timeout=10)
158    expected_output = ["supported_pids=PID0F+PID0B+PID03", "fuel_system1=OpenLoopInsufficientEngineTemperature fuel_system2=ClosedLoop", "data=100 kPa", "data=50.0 deg. C"]
159    for out in expected_output:
160        assert bytes_encode(out) in bytes_encode(std_out1) or bytes_encode(out) in bytes_encode(std_out2)
161
162
163= Test full scan
164
165drain_bus(iface0)
166
167# Add unsupported PID
168s1_pid01 = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID01()])
169example_responses.append(EcuResponse(responses=s1_pid01))
170
171with new_can_socket0() as isocan, ISOTPSocket(isocan, 0x7e8, 0x7e0, basecls=OBD, padding=True) as ecu, \
172        new_can_socket0() as isocan2, ISOTPSocket(isocan2, 0x7e0, 0x7e8, basecls=OBD, padding=True) as tester:
173    answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=OBD, verbose=False)
174    sim = threading.Thread(target=answering_machine, kwargs={'verbose': False, 'timeout': 100, 'stop_filter': lambda p: bytes(p) == b"\x01\xff\xff\xff\xff"})
175    sim.start()
176    try:
177        result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30", "-f", "-1", "-3"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
178        std_out1, std_err1 = result.communicate()
179        result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30", "-f", "-1", "-3"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
180        std_out2, std_err2 = result.communicate()
181    except Exception as e:
182        print(e)
183    finally:
184        tester.send(b"\x01\xff\xff\xff\xff")
185        sim.join(timeout=10)
186    expected_output = ["256 requests were sent", "1 requests were sent, 1 answered"]
187    for out in expected_output:
188        assert bytes_encode(out) in bytes_encode(std_out1) or bytes_encode(out) in bytes_encode(std_out2)
189
190+ Cleanup
191
192= Delete vcan interfaces
193
194assert cleanup_interfaces()
195