• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1% Regression tests for the CANSocket
2~ vcan_socket linux needs_root not_pypy
3
4# More information at http://www.secdev.org/projects/UTscapy/
5
6
7############
8############
9+ Configuration of CAN virtual sockets
10
11= Load module
12~ conf
13
14conf.contribs['CAN'] = {'swap-bytes': False, 'remove-padding': True}
15load_layer("can", globals_dict=globals())
16conf.contribs['CANSocket'] = {'use-python-can': True}
17from scapy.contrib.cansocket_python_can import *
18
19= Setup string for vcan
20~ conf command
21bashCommand = "/bin/bash -c 'sudo modprobe vcan; sudo ip link add name vcan0 type vcan; sudo ip link set dev vcan0 up'"
22
23= Load os
24~ conf command
25
26import os
27import threading
28from subprocess import call
29
30= Setup vcan0
31~ conf command
32
330 == os.system(bashCommand)
34
35= Define common used functions
36
37send_done = threading.Event()
38
39def sender(sock, msg):
40    if not hasattr(msg, "__iter__"):
41        msg = [msg]
42    for m in msg:
43        sock.send(m)
44    send_done.set()
45
46+ Basic Packet Tests()
47= CAN Packet init
48
49canframe = CANFD(identifier=0x7ff,length=10,data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab')
50bytes(canframe) == b'\x00\x00\x07\xff\x0c\x04\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08ab\x00\x00'
51
52+ Basic Socket Tests()
53= CAN Socket Init
54
55sock1 = CANSocket(bustype='socketcan', channel='vcan0', fd=True)
56sock1.close()
57del sock1
58sock1 = None
59assert sock1 == None
60
61= CAN Socket send recv small packet
62
63sock1 = CANSocket(bustype='socketcan', channel='vcan0', fd=True)
64sock2 = CANSocket(bustype='socketcan', channel='vcan0', fd=True)
65
66sock2.send(CANFD(identifier=0x7ff,length=10,data=b'\x01'*10))
67sock2.send(CAN(identifier=0x7ff,length=1,data=b'\x01'))
68rx1 = sock1.recv()
69rx2 = sock1.recv()
70sock1.close()
71sock2.close()
72
73assert rx1 == CANFD(identifier=0x7ff,length=10,data=b'\x01'*10)
74assert rx2 == CAN(identifier=0x7ff,length=1,data=b'\x01')
75
76
77= CAN Socket send recv small packet test with
78
79with CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock1, \
80    CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock2:
81    sock2.send(CANFD(identifier=0x7ff,length=1,data=b'\x01'))
82    rx = sock1.recv()
83
84assert rx == CANFD(identifier=0x7ff,length=1,data=b'\x01')
85
86= CAN Socket basecls test
87
88with CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock1, \
89    CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock2:
90    sock1.basecls = Raw
91    sock2.send(CANFD(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
92    rx = sock1.recv()
93    assert rx == Raw(bytes(CANFD(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')))
94
95= CAN Socket send recv swapped
96
97conf.contribs['CAN']['swap-bytes'] = True
98
99with CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock1, \
100    CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock2:
101    sock2.send(CANFD(identifier=0x7ff,length=64,data=b'\x01' * 64))
102    sock1.basecls = CAN
103    rx = sock1.recv()
104    assert rx == CANFD(identifier=0x7ff,length=64,data=b'\x01' * 64)
105
106conf.contribs['CAN']['swap-bytes'] = False
107
108= sniff with filtermask 0x7ff
109
110msgs = [CANFD(identifier=0x200, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8),
111        CANFD(identifier=0x300, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8),
112        CANFD(identifier=0x300, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8),
113        CANFD(identifier=0x200, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8),
114        CANFD(identifier=0x100, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8),
115        CANFD(identifier=0x200, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8)]
116
117with CANSocket(bustype='socketcan', channel='vcan0', fd=True, can_filters=[{'can_id': 0x200, 'can_mask': 0x7ff}]) as sock1, \
118        CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock2:
119    for m in msgs:
120        sock2.send(m)
121    packets = sock1.sniff(timeout=0.1, count=3)
122    assert len(packets) == 3
123
124
125+ bridge and sniff tests
126= bridge and sniff setup vcan1 package forwarding
127
128bashCommand = "/bin/bash -c 'sudo ip link add name vcan1 type vcan; sudo ip link set dev vcan1 up'"
129assert 0 == os.system(bashCommand)
130
131sock0 = CANSocket(bustype='socketcan', channel='vcan0', fd=True)
132sock1 = CANSocket(bustype='socketcan', channel='vcan1', fd=True)
133
134bridgeStarted = threading.Event()
135def bridge():
136    global bridgeStarted
137    bSock0 = CANSocket(
138        bustype='socketcan', channel='vcan0', bitrate=250000, fd=True)
139    bSock1 = CANSocket(
140        bustype='socketcan', channel='vcan1', bitrate=250000, fd=True)
141    def pnr(pkt):
142        return pkt
143    bSock0.timeout = 0.01
144    bSock1.timeout = 0.01
145    bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, xfrm21=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=6)
146    bSock0.close()
147    bSock1.close()
148
149threadBridge = threading.Thread(target=bridge)
150threadBridge.start()
151bridgeStarted.wait(timeout=1)
152
153sock0.send(CANFD(flags='extended', identifier=0x10010000, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8))
154sock0.send(CANFD(flags='extended', identifier=0x10020000, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8))
155sock0.send(CANFD(flags='extended', identifier=0x10000000, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8))
156sock0.send(CANFD(flags='extended', identifier=0x10030000, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8))
157sock0.send(CANFD(flags='extended', identifier=0x10040000, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8))
158sock0.send(CANFD(flags='extended', identifier=0x10000000, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8))
159
160packetsVCan1 = sock1.sniff(timeout=0.5, count=6)
161assert len(packetsVCan1) == 6
162
163sock1.close()
164sock0.close()
165
166threadBridge.join(timeout=3)
167assert not threadBridge.is_alive()
168
169
170= Delete vcan interfaces
171~ needs_root linux vcan_socket
172
173if 0 != call(["sudo", "ip" ,"link", "delete", "vcan0"]):
174        raise Exception("vcan0 could not be deleted")
175
176if 0 != call(["sudo", "ip" ,"link", "delete", "vcan1"]):
177        raise Exception("vcan1 could not be deleted")
178