1% Regression tests for compatibility between NativeCANSocket and PythonCANSocket 2~ not_pypy vcan_socket needs_root linux 3 4# More information at http://www.secdev.org/projects/UTscapy/ 5 6 7############ 8############ 9+ Configuration of CAN virtual sockets 10~ conf 11 12= Load module 13load_layer("can", globals_dict=globals()) 14from scapy.contrib.cansocket_python_can import PythonCANSocket 15from scapy.contrib.cansocket_native import NativeCANSocket 16 17conf.contribs['CAN'] = {'swap-bytes': False, 'remove-padding': True} 18 19= Setup string for vcan 20bashCommand = "/bin/bash -c 'sudo modprobe vcan; sudo ip link add name vcan0 type vcan; sudo ip link set dev vcan0 up'" 21 22= Load os 23import os 24import threading 25from subprocess import call 26 27= Setup vcan0 28 29assert 0 == os.system(bashCommand) 30 31= Define common used functions 32 33send_done = threading.Event() 34 35def sender(sock, msg): 36 if not hasattr(msg, "__iter__"): 37 msg = [msg] 38 for m in msg: 39 sock.send(m) 40 send_done.set() 41 42+ Basic Socket Tests 43 44= NativeCANSocket send recv small packet 45 46sock1 = NativeCANSocket(bustype='socketcan', channel='vcan0') 47sock2 = NativeCANSocket(bustype='socketcan', channel='vcan0') 48 49sock2.send(CAN(identifier=0x7ff,length=1,data=b'\x01')) 50rx = sock1.recv() 51sock1.close() 52sock2.close() 53 54assert rx == CAN(identifier=0x7ff,length=1,data=b'\x01') 55 56= NativeCANSocket send recv small packet test with 57 58with NativeCANSocket(bustype='socketcan', channel='vcan0') as sock1, \ 59 NativeCANSocket(bustype='socketcan', channel='vcan0') as sock2: 60 sock2.send(CAN(identifier=0x7ff,length=1,data=b'\x01')) 61 rx = sock1.recv() 62 63assert rx == CAN(identifier=0x7ff,length=1,data=b'\x01') 64 65= PythonCANSocket send recv small packet 66 67sock1 = PythonCANSocket(bustype='socketcan', channel='vcan0') 68sock2 = PythonCANSocket(bustype='socketcan', channel='vcan0') 69 70sock2.send(CAN(identifier=0x7ff,length=1,data=b'\x01')) 71rx = sock1.recv() 72sock1.close() 73sock2.close() 74 75assert rx == CAN(identifier=0x7ff,length=1,data=b'\x01') 76 77= PythonCANSocket send recv small packet test with 78 79with PythonCANSocket(bustype='socketcan', channel='vcan0') as sock1, \ 80 PythonCANSocket(bustype='socketcan', channel='vcan0') as sock2: 81 sock2.send(CAN(identifier=0x7ff,length=1,data=b'\x01')) 82 rx = sock1.recv() 83 84assert rx == CAN(identifier=0x7ff,length=1,data=b'\x01') 85 86= NativeCANSocket send recv swapped 87 88conf.contribs['CAN']['swap-bytes'] = True 89 90with NativeCANSocket(bustype='socketcan', channel='vcan0') as sock1, \ 91 NativeCANSocket(bustype='socketcan', channel='vcan0') as sock2: 92 sock2.send(CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) 93 rx = sock1.sniff(count=1, timeout=1) 94 assert len(rx) == 1 95 assert rx[0] == CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') 96 97conf.contribs['CAN']['swap-bytes'] = False 98 99= PythonCANSocket send recv swapped 100 101conf.contribs['CAN']['swap-bytes'] = True 102 103with PythonCANSocket(bustype='socketcan', channel='vcan0') as sock1, \ 104 PythonCANSocket(bustype='socketcan', channel='vcan0') as sock2: 105 sock2.send(CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) 106 rx = sock1.sniff(count=1, timeout=1) 107 assert rx[0] == CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') 108 109conf.contribs['CAN']['swap-bytes'] = False 110 111= NativeCANSocket sniff with filtermask 0x7ff 112 113msgs = [CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), 114 CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), 115 CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), 116 CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), 117 CAN(identifier=0x100, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), 118 CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')] 119 120with NativeCANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0x7ff}]) as sock1, \ 121 NativeCANSocket(bustype='socketcan', channel='vcan0') as sock2: 122 for m in msgs: 123 sock2.send(m) 124 packets = sock1.sniff(timeout=0.1, count=3) 125 assert len(packets) == 3 126 127= PythonCANSocket sniff with filtermask 0x7ff 128 129msgs = [CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), 130 CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), 131 CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), 132 CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), 133 CAN(identifier=0x100, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), 134 CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')] 135 136with PythonCANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0x7ff}]) as sock1, \ 137 PythonCANSocket(bustype='socketcan', channel='vcan0') as sock2: 138 for m in msgs: 139 sock2.send(m) 140 packets = sock1.sniff(timeout=0.1, count=3) 141 assert len(packets) == 3 142 143= NativeCANSocket sniff with multiple filters 144 145msgs = [CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), 146 CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), 147 CAN(identifier=0x400, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), 148 CAN(identifier=0x500, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), 149 CAN(identifier=0x600, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), 150 CAN(identifier=0x700, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), 151 CAN(identifier=0x7ff, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')] 152 153with NativeCANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0x7ff}, {'can_id': 0x400, 'can_mask': 0x7ff}, {'can_id': 0x600, 'can_mask': 0x7ff}, {'can_id': 0x7ff, 'can_mask': 0x7ff}]) as sock1, \ 154 NativeCANSocket(bustype='socketcan', channel='vcan0') as sock2: 155 for m in msgs: 156 sock2.send(m) 157 packets = sock1.sniff(timeout=0.1, count=4) 158 assert len(packets) == 4 159 160= PythonCANSocket sniff with multiple filters 161 162msgs = [CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), 163 CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), 164 CAN(identifier=0x400, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), 165 CAN(identifier=0x500, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), 166 CAN(identifier=0x600, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), 167 CAN(identifier=0x700, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), 168 CAN(identifier=0x7ff, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')] 169 170with PythonCANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0x7ff}, {'can_id': 0x400, 'can_mask': 0x7ff}, {'can_id': 0x600, 'can_mask': 0x7ff}, {'can_id': 0x7ff, 'can_mask': 0x7ff}]) as sock1, \ 171 PythonCANSocket(bustype='socketcan', channel='vcan0') as sock2: 172 for m in msgs: 173 sock2.send(m) 174 packets = sock1.sniff(timeout=0.1, count=4) 175 assert len(packets) == 4 176 177+ bridge and sniff tests 178 179= Setup vcan1 interface 180 181bashCommand = "/bin/bash -c 'sudo ip link add name vcan1 type vcan; sudo ip link set dev vcan1 up'" 182assert 0 == os.system(bashCommand) 183 184= NativeCANSocket bridge and sniff setup vcan1 package forwarding 185 186sock0 = NativeCANSocket(bustype='socketcan', channel='vcan0') 187sock1 = NativeCANSocket(bustype='socketcan', channel='vcan1') 188 189bridgeStarted = threading.Event() 190def bridge(): 191 global bridgeStarted 192 bSock0 = NativeCANSocket( 193 bustype='socketcan', channel='vcan0', 194 bitrate=250000) 195 bSock1 = NativeCANSocket( 196 bustype='socketcan', channel='vcan1', 197 bitrate=250000) 198 def pnr(pkt): 199 return pkt 200 bSock0.timeout = 0.01 201 bSock1.timeout = 0.01 202 bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, xfrm21=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=6) 203 bSock0.close() 204 bSock1.close() 205 206threadBridge = threading.Thread(target=bridge) 207threadBridge.start() 208bridgeStarted.wait() 209 210sock0.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) 211sock0.send(CAN(flags='extended', identifier=0x10020000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) 212sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) 213sock0.send(CAN(flags='extended', identifier=0x10030000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) 214sock0.send(CAN(flags='extended', identifier=0x10040000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) 215sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) 216 217packetsVCan1 = sock1.sniff(timeout=0.5, count=6) 218assert len(packetsVCan1) == 6 219 220sock1.close() 221sock0.close() 222 223threadBridge.join(timeout=3) 224 225= PythonCANSocket bridge and sniff setup vcan1 package forwarding 226 227sock0 = PythonCANSocket(bustype='socketcan', channel='vcan0') 228sock1 = PythonCANSocket(bustype='socketcan', channel='vcan1') 229 230bridgeStarted = threading.Event() 231def bridge(): 232 global bridgeStarted 233 bSock0 = PythonCANSocket( 234 bustype='socketcan', channel='vcan0', 235 bitrate=250000) 236 bSock1 = PythonCANSocket( 237 bustype='socketcan', channel='vcan1', 238 bitrate=250000) 239 def pnr(pkt): 240 return pkt 241 bSock0.timeout = 0.01 242 bSock1.timeout = 0.01 243 bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, xfrm21=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=6) 244 bSock0.close() 245 bSock1.close() 246 247threadBridge = threading.Thread(target=bridge) 248threadBridge.start() 249bridgeStarted.wait() 250 251sock0.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) 252sock0.send(CAN(flags='extended', identifier=0x10020000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) 253sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) 254sock0.send(CAN(flags='extended', identifier=0x10030000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) 255sock0.send(CAN(flags='extended', identifier=0x10040000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) 256sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) 257 258packetsVCan1 = sock1.sniff(timeout=0.5, count=6) 259assert len(packetsVCan1) == 6 260 261sock1.close() 262sock0.close() 263 264threadBridge.join(timeout=3) 265 266+ Cleanup 267 268= Delete vcan interfaces 269 270if 0 != call(["sudo", "ip" ,"link", "delete", "vcan0"]): 271 raise Exception("vcan0 could not be deleted") 272 273if 0 != call(["sudo", "ip" ,"link", "delete", "vcan1"]): 274 raise Exception("vcan1 could not be deleted") 275