1% Regression tests for nativecanfdsocket 2~ not_pypy vcan_socket needs_root linux 3 4# More information at http://www.secdev.org/projects/UTscapy/ 5 6 7############ 8############ 9+ Configuration of CAN virtual sockets 10~ conf 11 12= Load module 13load_layer("can", globals_dict=globals()) 14conf.contribs['CANSocket'] = {'use-python-can': False} 15from scapy.contrib.cansocket_native import * 16conf.contribs['CAN'] = {'swap-bytes': False, 'remove-padding': True} 17 18 19= Setup string for vcan 20bashCommand = "/bin/bash -c 'sudo modprobe vcan; sudo ip link add name vcan0 type vcan; sudo ip link set dev vcan0 up'" 21 22= Load os 23import os 24import threading 25from time import sleep 26from subprocess import call 27 28= Setup vcan0 29assert 0 == os.system(bashCommand) 30 31+ Basic Packet Tests() 32= CAN FD Packet init 33canfdframe = CANFD(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa') 34assert bytes(canfdframe) == b'\x00\x00\x07\xff\x08\x04\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08\xaa' 35 36+ Basic Socket Tests() 37= CAN FD Socket Init 38sock1 = CANSocket(channel="vcan0", fd=True) 39 40= CAN Socket send recv small packet without remove padding 41 42conf.contribs['CAN'] = {'swap-bytes': False, 'remove-padding': False} 43 44sock2 = CANSocket(channel="vcan0", fd=True) 45sock2.send(CANFD(identifier=0x7ff,length=9,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa')) 46sock2.send(CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) 47sock2.close() 48 49rx = sock1.recv() 50assert rx == CANFD(identifier=0x7ff,length=12,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa\x00\x00\x00') / Padding(b"\x00" * (64 - 12)) 51rx = sock1.recv() 52# different Kernel Versions produce different packets 53hexdump(rx) 54test = CANFD(identifier=0x7ff, fd_flags=0, length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') / Padding(b"\x00" * (64 - 8)) 55hexdump(test) 56test2 = CANFD(identifier=0x7ff,fd_flags=4, length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') / Padding(b"\x00" * (64 - 8)) 57hexdump(test2) 58assert bytes(rx) in [bytes(test), bytes(test2)] 59 60= CAN Socket send recv 61 62conf.contribs['CAN'] = {'swap-bytes': False, 'remove-padding': True} 63 64sock2 = CANSocket(channel="vcan0", fd=True) 65sock2.send(CANFD(identifier=0x7ff,length=9,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa')) 66sock2.close() 67 68rx = sock1.recv() 69assert rx == CANFD(identifier=0x7ff,length=12, fd_flags=4, data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa\x00\x00\x00') 70 71= CAN Socket basecls test 72 73 74sock2 = CANSocket(channel="vcan0", fd=True) 75sock2.send(CANFD(identifier=0x7ff,length=9,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa')) 76sock2.close() 77 78sock1.basecls = Raw 79rx = sock1.recv() 80assert rx.load == bytes(CANFD(identifier=0x7ff, fd_flags=4, length=12,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa\x00\x00\x00' + b'\x00' * (64 - 12))) 81 82= sniff with filtermask 0x1FFFFFFF and inverse filter 83 84 85sock1 = CANSocket(channel='vcan0', fd=True, can_filters=[{'can_id': 0x10000000 | CAN_INV_FILTER, 'can_mask': 0x1fffffff}]) 86 87sock2 = CANSocket(channel="vcan0", fd=True) 88sock2.send(CANFD(flags='extended', identifier=0x10010000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab')) 89sock2.send(CANFD(flags='extended', identifier=0x10020000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab')) 90sock2.send(CANFD(flags='extended', identifier=0x10000000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab')) 91sock2.send(CANFD(flags='extended', identifier=0x10030000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab')) 92sock2.send(CANFD(flags='extended', identifier=0x10040000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab')) 93sock2.send(CANFD(flags='extended', identifier=0x10000000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab')) 94sock2.close() 95 96packets = sock1.sniff(timeout=0.1, verbose=False, count=4) 97assert len(packets) == 4 98 99sock1.close() 100 101+ bridge and sniff tests 102 103= bridge and sniff setup vcan1 package forwarding 104 105 106bashCommand = "/bin/bash -c 'sudo ip link add name vcan1 type vcan; sudo ip link set dev vcan1 up'" 107assert 0 == os.system(bashCommand) 108 109sock0 = CANSocket(channel='vcan0', fd=True) 110sock1 = CANSocket(channel='vcan1', fd=True) 111 112bridgeStarted = threading.Event() 113 114def bridge(): 115 global bridgeStarted 116 bSock0 = CANSocket(channel="vcan0", fd=True) 117 bSock1 = CANSocket(channel='vcan1', fd=True) 118 def pnr(pkt): 119 return pkt 120 bridgeStarted.set() 121 bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, xfrm21=pnr, timeout=0.2, verbose=False, count=6) 122 bSock0.close() 123 bSock1.close() 124 125threadBridge = threading.Thread(target=bridge) 126threadBridge.start() 127bridgeStarted.wait(timeout=5) 128sock0.send(CANFD(flags='extended', identifier=0x10010000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842')) 129sock0.send(CANFD(flags='extended', identifier=0x10020000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842')) 130sock0.send(CANFD(flags='extended', identifier=0x10000000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842')) 131sock0.send(CANFD(flags='extended', identifier=0x10030000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842')) 132sock0.send(CANFD(flags='extended', identifier=0x10040000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842')) 133sock0.send(CANFD(flags='extended', identifier=0x10000000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842')) 134 135packetsVCan1 = sock1.sniff(timeout=0.1, verbose=False, count=6) 136assert len(packetsVCan1) == 6 137 138threadBridge.join(timeout=5) 139assert not threadBridge.is_alive() 140 141sock1.close() 142sock0.close() 143 144 145= Delete vcan interfaces 146 147if 0 != call(["sudo", "ip", "link", "delete", "vcan0"]): 148 raise Exception("vcan0 could not be deleted") 149 150if 0 != call(["sudo", "ip", "link", "delete", "vcan1"]): 151 raise Exception("vcan1 could not be deleted") 152 153