1% Regression tests for the CANSocket 2~ vcan_socket linux needs_root not_pypy 3 4# More information at http://www.secdev.org/projects/UTscapy/ 5 6 7############ 8############ 9+ Configuration of CAN virtual sockets 10 11= Load module 12~ conf 13 14conf.contribs['CAN'] = {'swap-bytes': False, 'remove-padding': True} 15load_layer("can", globals_dict=globals()) 16conf.contribs['CANSocket'] = {'use-python-can': True} 17from scapy.contrib.cansocket_python_can import * 18 19= Setup string for vcan 20~ conf command 21bashCommand = "/bin/bash -c 'sudo modprobe vcan; sudo ip link add name vcan0 type vcan; sudo ip link set dev vcan0 up'" 22 23= Load os 24~ conf command 25 26import os 27import threading 28from subprocess import call 29 30= Setup vcan0 31~ conf command 32 330 == os.system(bashCommand) 34 35= Define common used functions 36 37send_done = threading.Event() 38 39def sender(sock, msg): 40 if not hasattr(msg, "__iter__"): 41 msg = [msg] 42 for m in msg: 43 sock.send(m) 44 send_done.set() 45 46+ Basic Packet Tests() 47= CAN Packet init 48 49canframe = CANFD(identifier=0x7ff,length=10,data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab') 50bytes(canframe) == b'\x00\x00\x07\xff\x0c\x04\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08ab\x00\x00' 51 52+ Basic Socket Tests() 53= CAN Socket Init 54 55sock1 = CANSocket(bustype='socketcan', channel='vcan0', fd=True) 56sock1.close() 57del sock1 58sock1 = None 59assert sock1 == None 60 61= CAN Socket send recv small packet 62 63sock1 = CANSocket(bustype='socketcan', channel='vcan0', fd=True) 64sock2 = CANSocket(bustype='socketcan', channel='vcan0', fd=True) 65 66sock2.send(CANFD(identifier=0x7ff,length=10,data=b'\x01'*10)) 67sock2.send(CAN(identifier=0x7ff,length=1,data=b'\x01')) 68rx1 = sock1.recv() 69rx2 = sock1.recv() 70sock1.close() 71sock2.close() 72 73assert rx1 == CANFD(identifier=0x7ff,length=10,data=b'\x01'*10) 74assert rx2 == CAN(identifier=0x7ff,length=1,data=b'\x01') 75 76 77= CAN Socket send recv small packet test with 78 79with CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock1, \ 80 CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock2: 81 sock2.send(CANFD(identifier=0x7ff,length=1,data=b'\x01')) 82 rx = sock1.recv() 83 84assert rx == CANFD(identifier=0x7ff,length=1,data=b'\x01') 85 86= CAN Socket basecls test 87 88with CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock1, \ 89 CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock2: 90 sock1.basecls = Raw 91 sock2.send(CANFD(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) 92 rx = sock1.recv() 93 assert rx == Raw(bytes(CANFD(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))) 94 95= CAN Socket send recv swapped 96 97conf.contribs['CAN']['swap-bytes'] = True 98 99with CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock1, \ 100 CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock2: 101 sock2.send(CANFD(identifier=0x7ff,length=64,data=b'\x01' * 64)) 102 sock1.basecls = CAN 103 rx = sock1.recv() 104 assert rx == CANFD(identifier=0x7ff,length=64,data=b'\x01' * 64) 105 106conf.contribs['CAN']['swap-bytes'] = False 107 108= sniff with filtermask 0x7ff 109 110msgs = [CANFD(identifier=0x200, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8), 111 CANFD(identifier=0x300, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8), 112 CANFD(identifier=0x300, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8), 113 CANFD(identifier=0x200, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8), 114 CANFD(identifier=0x100, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8), 115 CANFD(identifier=0x200, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8)] 116 117with CANSocket(bustype='socketcan', channel='vcan0', fd=True, can_filters=[{'can_id': 0x200, 'can_mask': 0x7ff}]) as sock1, \ 118 CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock2: 119 for m in msgs: 120 sock2.send(m) 121 packets = sock1.sniff(timeout=0.1, count=3) 122 assert len(packets) == 3 123 124 125+ bridge and sniff tests 126= bridge and sniff setup vcan1 package forwarding 127 128bashCommand = "/bin/bash -c 'sudo ip link add name vcan1 type vcan; sudo ip link set dev vcan1 up'" 129assert 0 == os.system(bashCommand) 130 131sock0 = CANSocket(bustype='socketcan', channel='vcan0', fd=True) 132sock1 = CANSocket(bustype='socketcan', channel='vcan1', fd=True) 133 134bridgeStarted = threading.Event() 135def bridge(): 136 global bridgeStarted 137 bSock0 = CANSocket( 138 bustype='socketcan', channel='vcan0', bitrate=250000, fd=True) 139 bSock1 = CANSocket( 140 bustype='socketcan', channel='vcan1', bitrate=250000, fd=True) 141 def pnr(pkt): 142 return pkt 143 bSock0.timeout = 0.01 144 bSock1.timeout = 0.01 145 bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, xfrm21=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=6) 146 bSock0.close() 147 bSock1.close() 148 149threadBridge = threading.Thread(target=bridge) 150threadBridge.start() 151bridgeStarted.wait(timeout=1) 152 153sock0.send(CANFD(flags='extended', identifier=0x10010000, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8)) 154sock0.send(CANFD(flags='extended', identifier=0x10020000, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8)) 155sock0.send(CANFD(flags='extended', identifier=0x10000000, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8)) 156sock0.send(CANFD(flags='extended', identifier=0x10030000, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8)) 157sock0.send(CANFD(flags='extended', identifier=0x10040000, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8)) 158sock0.send(CANFD(flags='extended', identifier=0x10000000, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8)) 159 160packetsVCan1 = sock1.sniff(timeout=0.5, count=6) 161assert len(packetsVCan1) == 6 162 163sock1.close() 164sock0.close() 165 166threadBridge.join(timeout=3) 167assert not threadBridge.is_alive() 168 169 170= Delete vcan interfaces 171~ needs_root linux vcan_socket 172 173if 0 != call(["sudo", "ip" ,"link", "delete", "vcan0"]): 174 raise Exception("vcan0 could not be deleted") 175 176if 0 != call(["sudo", "ip" ,"link", "delete", "vcan1"]): 177 raise Exception("vcan1 could not be deleted") 178