• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1% Regression tests for compatibility between NativeCANSocket and PythonCANSocket
2~ not_pypy vcan_socket needs_root linux
3
4# More information at http://www.secdev.org/projects/UTscapy/
5
6
7############
8############
9+ Configuration of CAN virtual sockets
10~ conf
11
12= Load module
13load_layer("can", globals_dict=globals())
14from scapy.contrib.cansocket_python_can import PythonCANSocket
15from scapy.contrib.cansocket_native import NativeCANSocket
16
17conf.contribs['CAN'] = {'swap-bytes': False, 'remove-padding': True}
18
19= Setup string for vcan
20bashCommand = "/bin/bash -c 'sudo modprobe vcan; sudo ip link add name vcan0 type vcan; sudo ip link set dev vcan0 up'"
21
22= Load os
23import os
24import threading
25from subprocess import call
26
27= Setup vcan0
28
29assert 0 == os.system(bashCommand)
30
31= Define common used functions
32
33send_done = threading.Event()
34
35def sender(sock, msg):
36    if not hasattr(msg, "__iter__"):
37        msg = [msg]
38    for m in msg:
39        sock.send(m)
40    send_done.set()
41
42+ Basic Socket Tests
43
44= NativeCANSocket send recv small packet
45
46sock1 = NativeCANSocket(bustype='socketcan', channel='vcan0')
47sock2 = NativeCANSocket(bustype='socketcan', channel='vcan0')
48
49sock2.send(CAN(identifier=0x7ff,length=1,data=b'\x01'))
50rx = sock1.recv()
51sock1.close()
52sock2.close()
53
54assert rx == CAN(identifier=0x7ff,length=1,data=b'\x01')
55
56= NativeCANSocket send recv small packet test with
57
58with NativeCANSocket(bustype='socketcan', channel='vcan0') as sock1, \
59        NativeCANSocket(bustype='socketcan', channel='vcan0') as sock2:
60    sock2.send(CAN(identifier=0x7ff,length=1,data=b'\x01'))
61    rx = sock1.recv()
62
63assert rx == CAN(identifier=0x7ff,length=1,data=b'\x01')
64
65= PythonCANSocket send recv small packet
66
67sock1 = PythonCANSocket(bustype='socketcan', channel='vcan0')
68sock2 = PythonCANSocket(bustype='socketcan', channel='vcan0')
69
70sock2.send(CAN(identifier=0x7ff,length=1,data=b'\x01'))
71rx = sock1.recv()
72sock1.close()
73sock2.close()
74
75assert rx == CAN(identifier=0x7ff,length=1,data=b'\x01')
76
77= PythonCANSocket send recv small packet test with
78
79with PythonCANSocket(bustype='socketcan', channel='vcan0') as sock1, \
80        PythonCANSocket(bustype='socketcan', channel='vcan0') as sock2:
81    sock2.send(CAN(identifier=0x7ff,length=1,data=b'\x01'))
82    rx = sock1.recv()
83
84assert rx == CAN(identifier=0x7ff,length=1,data=b'\x01')
85
86= NativeCANSocket send recv swapped
87
88conf.contribs['CAN']['swap-bytes'] = True
89
90with NativeCANSocket(bustype='socketcan', channel='vcan0') as sock1, \
91        NativeCANSocket(bustype='socketcan', channel='vcan0') as sock2:
92    sock2.send(CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
93    rx = sock1.sniff(count=1, timeout=1)
94    assert len(rx) == 1
95    assert rx[0] == CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')
96
97conf.contribs['CAN']['swap-bytes'] = False
98
99= PythonCANSocket send recv swapped
100
101conf.contribs['CAN']['swap-bytes'] = True
102
103with PythonCANSocket(bustype='socketcan', channel='vcan0') as sock1, \
104        PythonCANSocket(bustype='socketcan', channel='vcan0') as sock2:
105    sock2.send(CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
106    rx = sock1.sniff(count=1, timeout=1)
107    assert rx[0] == CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')
108
109conf.contribs['CAN']['swap-bytes'] = False
110
111= NativeCANSocket sniff with filtermask 0x7ff
112
113msgs = [CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
114    CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
115    CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
116    CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
117    CAN(identifier=0x100, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
118    CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')]
119
120with NativeCANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0x7ff}]) as sock1, \
121        NativeCANSocket(bustype='socketcan', channel='vcan0') as sock2:
122    for m in msgs:
123        sock2.send(m)
124    packets = sock1.sniff(timeout=0.1, count=3)
125    assert len(packets) == 3
126
127= PythonCANSocket sniff with filtermask 0x7ff
128
129msgs = [CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
130    CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
131    CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
132    CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
133    CAN(identifier=0x100, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
134    CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')]
135
136with PythonCANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0x7ff}]) as sock1, \
137        PythonCANSocket(bustype='socketcan', channel='vcan0') as sock2:
138    for m in msgs:
139        sock2.send(m)
140    packets = sock1.sniff(timeout=0.1, count=3)
141    assert len(packets) == 3
142
143= NativeCANSocket sniff with multiple filters
144
145msgs = [CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
146    CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
147    CAN(identifier=0x400, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
148    CAN(identifier=0x500, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
149    CAN(identifier=0x600, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
150    CAN(identifier=0x700, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
151    CAN(identifier=0x7ff, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')]
152
153with NativeCANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0x7ff}, {'can_id': 0x400, 'can_mask': 0x7ff}, {'can_id': 0x600, 'can_mask': 0x7ff}, {'can_id': 0x7ff, 'can_mask': 0x7ff}]) as sock1, \
154        NativeCANSocket(bustype='socketcan', channel='vcan0') as sock2:
155    for m in msgs:
156        sock2.send(m)
157    packets = sock1.sniff(timeout=0.1, count=4)
158    assert len(packets) == 4
159
160= PythonCANSocket sniff with multiple filters
161
162msgs = [CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
163    CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
164    CAN(identifier=0x400, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
165    CAN(identifier=0x500, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
166    CAN(identifier=0x600, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
167    CAN(identifier=0x700, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
168    CAN(identifier=0x7ff, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')]
169
170with PythonCANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0x7ff}, {'can_id': 0x400, 'can_mask': 0x7ff}, {'can_id': 0x600, 'can_mask': 0x7ff}, {'can_id': 0x7ff, 'can_mask': 0x7ff}]) as sock1, \
171        PythonCANSocket(bustype='socketcan', channel='vcan0') as sock2:
172    for m in msgs:
173        sock2.send(m)
174    packets = sock1.sniff(timeout=0.1, count=4)
175    assert len(packets) == 4
176
177+ bridge and sniff tests
178
179= Setup vcan1 interface
180
181bashCommand = "/bin/bash -c 'sudo ip link add name vcan1 type vcan; sudo ip link set dev vcan1 up'"
182assert 0 == os.system(bashCommand)
183
184= NativeCANSocket bridge and sniff setup vcan1 package forwarding
185
186sock0 = NativeCANSocket(bustype='socketcan', channel='vcan0')
187sock1 = NativeCANSocket(bustype='socketcan', channel='vcan1')
188
189bridgeStarted = threading.Event()
190def bridge():
191    global bridgeStarted
192    bSock0 = NativeCANSocket(
193        bustype='socketcan', channel='vcan0',
194                                bitrate=250000)
195    bSock1 = NativeCANSocket(
196        bustype='socketcan', channel='vcan1',
197                                bitrate=250000)
198    def pnr(pkt):
199        return pkt
200    bSock0.timeout = 0.01
201    bSock1.timeout = 0.01
202    bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, xfrm21=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=6)
203    bSock0.close()
204    bSock1.close()
205
206threadBridge = threading.Thread(target=bridge)
207threadBridge.start()
208bridgeStarted.wait()
209
210sock0.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
211sock0.send(CAN(flags='extended', identifier=0x10020000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
212sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
213sock0.send(CAN(flags='extended', identifier=0x10030000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
214sock0.send(CAN(flags='extended', identifier=0x10040000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
215sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
216
217packetsVCan1 = sock1.sniff(timeout=0.5, count=6)
218assert len(packetsVCan1) == 6
219
220sock1.close()
221sock0.close()
222
223threadBridge.join(timeout=3)
224
225= PythonCANSocket bridge and sniff setup vcan1 package forwarding
226
227sock0 = PythonCANSocket(bustype='socketcan', channel='vcan0')
228sock1 = PythonCANSocket(bustype='socketcan', channel='vcan1')
229
230bridgeStarted = threading.Event()
231def bridge():
232    global bridgeStarted
233    bSock0 = PythonCANSocket(
234        bustype='socketcan', channel='vcan0',
235                                bitrate=250000)
236    bSock1 = PythonCANSocket(
237        bustype='socketcan', channel='vcan1',
238                                bitrate=250000)
239    def pnr(pkt):
240        return pkt
241    bSock0.timeout = 0.01
242    bSock1.timeout = 0.01
243    bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, xfrm21=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=6)
244    bSock0.close()
245    bSock1.close()
246
247threadBridge = threading.Thread(target=bridge)
248threadBridge.start()
249bridgeStarted.wait()
250
251sock0.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
252sock0.send(CAN(flags='extended', identifier=0x10020000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
253sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
254sock0.send(CAN(flags='extended', identifier=0x10030000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
255sock0.send(CAN(flags='extended', identifier=0x10040000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
256sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
257
258packetsVCan1 = sock1.sniff(timeout=0.5, count=6)
259assert len(packetsVCan1) == 6
260
261sock1.close()
262sock0.close()
263
264threadBridge.join(timeout=3)
265
266+ Cleanup
267
268= Delete vcan interfaces
269
270if 0 != call(["sudo", "ip" ,"link", "delete", "vcan0"]):
271        raise Exception("vcan0 could not be deleted")
272
273if 0 != call(["sudo", "ip" ,"link", "delete", "vcan1"]):
274        raise Exception("vcan1 could not be deleted")
275