• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1% Regression tests for nativecanfdsocket
2~ not_pypy vcan_socket needs_root linux
3
4# More information at http://www.secdev.org/projects/UTscapy/
5
6
7############
8############
9+ Configuration of CAN virtual sockets
10~ conf
11
12= Load module
13load_layer("can", globals_dict=globals())
14conf.contribs['CANSocket'] = {'use-python-can': False}
15from scapy.contrib.cansocket_native import *
16conf.contribs['CAN'] = {'swap-bytes': False, 'remove-padding': True}
17
18
19= Setup string for vcan
20bashCommand = "/bin/bash -c 'sudo modprobe vcan; sudo ip link add name vcan0 type vcan; sudo ip link set dev vcan0 up'"
21
22= Load os
23import os
24import threading
25from time import sleep
26from subprocess import call
27
28= Setup vcan0
29assert 0 == os.system(bashCommand)
30
31+ Basic Packet Tests()
32= CAN FD Packet init
33canfdframe = CANFD(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa')
34assert bytes(canfdframe) == b'\x00\x00\x07\xff\x08\x04\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08\xaa'
35
36+ Basic Socket Tests()
37= CAN FD Socket Init
38sock1 = CANSocket(channel="vcan0", fd=True)
39
40= CAN Socket send recv small packet without remove padding
41
42conf.contribs['CAN'] = {'swap-bytes': False, 'remove-padding': False}
43
44sock2 = CANSocket(channel="vcan0", fd=True)
45sock2.send(CANFD(identifier=0x7ff,length=9,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa'))
46sock2.send(CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
47sock2.close()
48
49rx = sock1.recv()
50assert rx == CANFD(identifier=0x7ff,length=12,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa\x00\x00\x00') / Padding(b"\x00" * (64 - 12))
51rx = sock1.recv()
52# different Kernel Versions produce different packets
53hexdump(rx)
54test = CANFD(identifier=0x7ff, fd_flags=0, length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') / Padding(b"\x00" * (64 - 8))
55hexdump(test)
56test2 = CANFD(identifier=0x7ff,fd_flags=4, length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') / Padding(b"\x00" * (64 - 8))
57hexdump(test2)
58assert bytes(rx) in [bytes(test), bytes(test2)]
59
60= CAN Socket send recv
61
62conf.contribs['CAN'] = {'swap-bytes': False, 'remove-padding': True}
63
64sock2 = CANSocket(channel="vcan0", fd=True)
65sock2.send(CANFD(identifier=0x7ff,length=9,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa'))
66sock2.close()
67
68rx = sock1.recv()
69assert rx == CANFD(identifier=0x7ff,length=12, fd_flags=4, data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa\x00\x00\x00')
70
71= CAN Socket basecls test
72
73
74sock2 = CANSocket(channel="vcan0", fd=True)
75sock2.send(CANFD(identifier=0x7ff,length=9,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa'))
76sock2.close()
77
78sock1.basecls = Raw
79rx = sock1.recv()
80assert rx.load == bytes(CANFD(identifier=0x7ff, fd_flags=4, length=12,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa\x00\x00\x00' + b'\x00' * (64 - 12)))
81
82= sniff with filtermask 0x1FFFFFFF and inverse filter
83
84
85sock1 = CANSocket(channel='vcan0', fd=True, can_filters=[{'can_id': 0x10000000 | CAN_INV_FILTER, 'can_mask': 0x1fffffff}])
86
87sock2 = CANSocket(channel="vcan0", fd=True)
88sock2.send(CANFD(flags='extended', identifier=0x10010000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab'))
89sock2.send(CANFD(flags='extended', identifier=0x10020000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab'))
90sock2.send(CANFD(flags='extended', identifier=0x10000000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab'))
91sock2.send(CANFD(flags='extended', identifier=0x10030000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab'))
92sock2.send(CANFD(flags='extended', identifier=0x10040000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab'))
93sock2.send(CANFD(flags='extended', identifier=0x10000000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab'))
94sock2.close()
95
96packets = sock1.sniff(timeout=0.1, verbose=False, count=4)
97assert len(packets) == 4
98
99sock1.close()
100
101+ bridge and sniff tests
102
103= bridge and sniff setup vcan1 package forwarding
104
105
106bashCommand = "/bin/bash -c 'sudo ip link add name vcan1 type vcan; sudo ip link set dev vcan1 up'"
107assert 0 == os.system(bashCommand)
108
109sock0 = CANSocket(channel='vcan0', fd=True)
110sock1 = CANSocket(channel='vcan1', fd=True)
111
112bridgeStarted = threading.Event()
113
114def bridge():
115    global bridgeStarted
116    bSock0 = CANSocket(channel="vcan0", fd=True)
117    bSock1 = CANSocket(channel='vcan1', fd=True)
118    def pnr(pkt):
119        return pkt
120    bridgeStarted.set()
121    bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, xfrm21=pnr, timeout=0.2, verbose=False, count=6)
122    bSock0.close()
123    bSock1.close()
124
125threadBridge = threading.Thread(target=bridge)
126threadBridge.start()
127bridgeStarted.wait(timeout=5)
128sock0.send(CANFD(flags='extended', identifier=0x10010000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842'))
129sock0.send(CANFD(flags='extended', identifier=0x10020000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842'))
130sock0.send(CANFD(flags='extended', identifier=0x10000000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842'))
131sock0.send(CANFD(flags='extended', identifier=0x10030000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842'))
132sock0.send(CANFD(flags='extended', identifier=0x10040000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842'))
133sock0.send(CANFD(flags='extended', identifier=0x10000000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842'))
134
135packetsVCan1 = sock1.sniff(timeout=0.1, verbose=False, count=6)
136assert len(packetsVCan1) == 6
137
138threadBridge.join(timeout=5)
139assert not threadBridge.is_alive()
140
141sock1.close()
142sock0.close()
143
144
145= Delete vcan interfaces
146
147if 0 != call(["sudo", "ip", "link", "delete", "vcan0"]):
148        raise Exception("vcan0 could not be deleted")
149
150if 0 != call(["sudo", "ip", "link", "delete", "vcan1"]):
151        raise Exception("vcan1 could not be deleted")
152
153