% Regression tests for nativecanfdsocket ~ not_pypy vcan_socket needs_root linux # More information at http://www.secdev.org/projects/UTscapy/ ############ ############ + Configuration of CAN virtual sockets ~ conf = Load module load_layer("can", globals_dict=globals()) conf.contribs['CANSocket'] = {'use-python-can': False} from scapy.contrib.cansocket_native import * conf.contribs['CAN'] = {'swap-bytes': False, 'remove-padding': True} = Setup string for vcan bashCommand = "/bin/bash -c 'sudo modprobe vcan; sudo ip link add name vcan0 type vcan; sudo ip link set dev vcan0 up'" = Load os import os import threading from time import sleep from subprocess import call = Setup vcan0 assert 0 == os.system(bashCommand) + Basic Packet Tests() = CAN FD Packet init canfdframe = CANFD(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa') assert bytes(canfdframe) == b'\x00\x00\x07\xff\x08\x04\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08\xaa' + Basic Socket Tests() = CAN FD Socket Init sock1 = CANSocket(channel="vcan0", fd=True) = CAN Socket send recv small packet without remove padding conf.contribs['CAN'] = {'swap-bytes': False, 'remove-padding': False} sock2 = CANSocket(channel="vcan0", fd=True) sock2.send(CANFD(identifier=0x7ff,length=9,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa')) sock2.send(CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock2.close() rx = sock1.recv() assert rx == CANFD(identifier=0x7ff,length=12,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa\x00\x00\x00') / Padding(b"\x00" * (64 - 12)) rx = sock1.recv() # different Kernel Versions produce different packets hexdump(rx) test = CANFD(identifier=0x7ff, fd_flags=0, length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') / Padding(b"\x00" * (64 - 8)) hexdump(test) test2 = CANFD(identifier=0x7ff,fd_flags=4, length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') / Padding(b"\x00" * (64 - 8)) hexdump(test2) assert bytes(rx) in [bytes(test), bytes(test2)] = CAN Socket send recv conf.contribs['CAN'] = {'swap-bytes': False, 'remove-padding': True} sock2 = CANSocket(channel="vcan0", fd=True) sock2.send(CANFD(identifier=0x7ff,length=9,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa')) sock2.close() rx = sock1.recv() assert rx == CANFD(identifier=0x7ff,length=12, fd_flags=4, data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa\x00\x00\x00') = CAN Socket basecls test sock2 = CANSocket(channel="vcan0", fd=True) sock2.send(CANFD(identifier=0x7ff,length=9,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa')) sock2.close() sock1.basecls = Raw rx = sock1.recv() assert rx.load == bytes(CANFD(identifier=0x7ff, fd_flags=4, length=12,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa\x00\x00\x00' + b'\x00' * (64 - 12))) = sniff with filtermask 0x1FFFFFFF and inverse filter sock1 = CANSocket(channel='vcan0', fd=True, can_filters=[{'can_id': 0x10000000 | CAN_INV_FILTER, 'can_mask': 0x1fffffff}]) sock2 = CANSocket(channel="vcan0", fd=True) sock2.send(CANFD(flags='extended', identifier=0x10010000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab')) sock2.send(CANFD(flags='extended', identifier=0x10020000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab')) sock2.send(CANFD(flags='extended', identifier=0x10000000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab')) sock2.send(CANFD(flags='extended', identifier=0x10030000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab')) sock2.send(CANFD(flags='extended', identifier=0x10040000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab')) sock2.send(CANFD(flags='extended', identifier=0x10000000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab')) sock2.close() packets = sock1.sniff(timeout=0.1, verbose=False, count=4) assert len(packets) == 4 sock1.close() + bridge and sniff tests = bridge and sniff setup vcan1 package forwarding bashCommand = "/bin/bash -c 'sudo ip link add name vcan1 type vcan; sudo ip link set dev vcan1 up'" assert 0 == os.system(bashCommand) sock0 = CANSocket(channel='vcan0', fd=True) sock1 = CANSocket(channel='vcan1', fd=True) bridgeStarted = threading.Event() def bridge(): global bridgeStarted bSock0 = CANSocket(channel="vcan0", fd=True) bSock1 = CANSocket(channel='vcan1', fd=True) def pnr(pkt): return pkt bridgeStarted.set() bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, xfrm21=pnr, timeout=0.2, verbose=False, count=6) bSock0.close() bSock1.close() threadBridge = threading.Thread(target=bridge) threadBridge.start() bridgeStarted.wait(timeout=5) sock0.send(CANFD(flags='extended', identifier=0x10010000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842')) sock0.send(CANFD(flags='extended', identifier=0x10020000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842')) sock0.send(CANFD(flags='extended', identifier=0x10000000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842')) sock0.send(CANFD(flags='extended', identifier=0x10030000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842')) sock0.send(CANFD(flags='extended', identifier=0x10040000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842')) sock0.send(CANFD(flags='extended', identifier=0x10000000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842')) packetsVCan1 = sock1.sniff(timeout=0.1, verbose=False, count=6) assert len(packetsVCan1) == 6 threadBridge.join(timeout=5) assert not threadBridge.is_alive() sock1.close() sock0.close() = Delete vcan interfaces if 0 != call(["sudo", "ip", "link", "delete", "vcan0"]): raise Exception("vcan0 could not be deleted") if 0 != call(["sudo", "ip", "link", "delete", "vcan1"]): raise Exception("vcan1 could not be deleted")