% Regression tests for the CANSocket ~ vcan_socket linux needs_root not_pypy # More information at http://www.secdev.org/projects/UTscapy/ ############ ############ + Configuration of CAN virtual sockets = Load module ~ conf conf.contribs['CAN'] = {'swap-bytes': False, 'remove-padding': True} load_layer("can", globals_dict=globals()) conf.contribs['CANSocket'] = {'use-python-can': True} from scapy.contrib.cansocket_python_can import * = Setup string for vcan ~ conf command bashCommand = "/bin/bash -c 'sudo modprobe vcan; sudo ip link add name vcan0 type vcan; sudo ip link set dev vcan0 up'" = Load os ~ conf command import os import threading from subprocess import call = Setup vcan0 ~ conf command 0 == os.system(bashCommand) = Define common used functions send_done = threading.Event() def sender(sock, msg): if not hasattr(msg, "__iter__"): msg = [msg] for m in msg: sock.send(m) send_done.set() + Basic Packet Tests() = CAN Packet init canframe = CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') bytes(canframe) == b'\x00\x00\x07\xff\x08\x00\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08' + Basic Socket Tests() = CAN Socket Init sock1 = CANSocket(bustype='socketcan', channel='vcan0') sock1.close() del sock1 sock1 = None = CAN Socket send recv small packet sock1 = CANSocket(bustype='socketcan', channel='vcan0') sock2 = CANSocket(bustype='socketcan', channel='vcan0') sock2.send(CAN(identifier=0x7ff,length=1,data=b'\x01')) rx = sock1.recv() sock1.close() sock2.close() assert rx == CAN(identifier=0x7ff,length=1,data=b'\x01') = CAN Socket send recv small packet test with with CANSocket(bustype='socketcan', channel='vcan0') as sock1, \ CANSocket(bustype='socketcan', channel='vcan0') as sock2: sock2.send(CAN(identifier=0x7ff,length=1,data=b'\x01')) rx = sock1.recv() assert rx == CAN(identifier=0x7ff,length=1,data=b'\x01') = CAN Socket send recv ISOTP_Packet from scapy.contrib.isotp import ISOTPHeader, ISOTP_FF with CANSocket(bustype='socketcan', channel='vcan0') as sock1, \ CANSocket(bustype='socketcan', channel='vcan0') as sock2: sock2.send(ISOTPHeader(identifier=0x7ff)/ISOTP_FF(message_size=100, data=b'abcdef')) rx = sock1.recv() assert rx == CAN(identifier=0x7ff,length=8,data=b'\x10\x64abcdef') = CAN Socket basecls test with CANSocket(bustype='socketcan', channel='vcan0') as sock1, \ CANSocket(bustype='socketcan', channel='vcan0') as sock2: sock1.basecls = Raw sock2.send(CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) rx = sock1.recv() assert rx == Raw(bytes(CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))) = CAN Socket send recv with CANSocket(bustype='socketcan', channel='vcan0') as sock1, \ CANSocket(bustype='socketcan', channel='vcan0') as sock2: sock2.send(CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock1.basecls = CAN rx = sock1.recv() assert rx == CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') = CAN Socket send recv swapped conf.contribs['CAN']['swap-bytes'] = True with CANSocket(bustype='socketcan', channel='vcan0') as sock1, \ CANSocket(bustype='socketcan', channel='vcan0') as sock2: sock2.send(CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock1.basecls = CAN rx = sock1.recv() assert rx == CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') conf.contribs['CAN']['swap-bytes'] = False = sniff with filtermask 0x7ff msgs = [CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(identifier=0x100, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')] with CANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0x7ff}]) as sock1, \ CANSocket(bustype='socketcan', channel='vcan0') as sock2: for m in msgs: sock2.send(m) packets = sock1.sniff(timeout=0.1, count=3) assert len(packets) == 3 = sniff with filtermask 0x700 msgs = [CAN(identifier=0x212, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(identifier=0x2ff, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(identifier=0x1ff, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(identifier=0x2aa, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')] with CANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0x700}]) as sock1, \ CANSocket(bustype='socketcan', channel='vcan0') as sock2: for m in msgs: sock2.send(m) packets = sock1.sniff(timeout=0.1, count=4) assert len(packets) == 4 = sniff with filtermask 0x0ff msgs = [CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(identifier=0x301, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(identifier=0x1ff, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(identifier=0x700, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(identifier=0x100, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')] with CANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0xff}]) as sock1, \ CANSocket(bustype='socketcan', channel='vcan0') as sock2: for m in msgs: sock2.send(m) packets = sock1.sniff(timeout=0.1, count=4) assert len(packets) == 4 = sniff with multiple filters msgs = [CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(identifier=0x400, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(identifier=0x500, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(identifier=0x600, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(identifier=0x700, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(identifier=0x7ff, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')] with CANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0x7ff}, {'can_id': 0x400, 'can_mask': 0x7ff}, {'can_id': 0x600, 'can_mask': 0x7ff}, {'can_id': 0x7ff, 'can_mask': 0x7ff}]) as sock1, \ CANSocket(bustype='socketcan', channel='vcan0') as sock2: for m in msgs: sock2.send(m) packets = sock1.sniff(timeout=0.1, count=4) assert len(packets) == 4 = sniff with filtermask 0x7ff msgs = [CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(identifier=0x100, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')] with CANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0x7ff}]) as sock1, \ CANSocket(bustype='socketcan', channel='vcan0') as sock2: for m in msgs: sock2.send(m) packets = sock1.sniff(timeout=0.1, count=4) assert len(packets) == 4 = sniff with filtermask 0x1FFFFFFF msgs = [CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(flags='extended', identifier=0x10020000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(flags='extended', identifier=0x10030000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(flags='extended', identifier=0x10040000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')] with CANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x10000000, 'can_mask': 0x1fffffff}]) as sock1, \ CANSocket(bustype='socketcan', channel='vcan0') as sock2: for m in msgs: sock2.send(m) packets = sock1.sniff(timeout=0.1, count=2) assert len(packets) == 2 + bridge and sniff tests = bridge and sniff setup vcan1 package forwarding bashCommand = "/bin/bash -c 'sudo ip link add name vcan1 type vcan; sudo ip link set dev vcan1 up'" assert 0 == os.system(bashCommand) sock0 = CANSocket(bustype='socketcan', channel='vcan0') sock1 = CANSocket(bustype='socketcan', channel='vcan1') bridgeStarted = threading.Event() def bridge(): global bridgeStarted bSock0 = CANSocket( bustype='socketcan', channel='vcan0', bitrate=250000) bSock1 = CANSocket( bustype='socketcan', channel='vcan1', bitrate=250000) def pnr(pkt): return pkt bSock0.timeout = 0.01 bSock1.timeout = 0.01 bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, xfrm21=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=6) bSock0.close() bSock1.close() threadBridge = threading.Thread(target=bridge) threadBridge.start() bridgeStarted.wait(timeout=1) sock0.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x10020000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x10030000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x10040000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) packetsVCan1 = sock1.sniff(timeout=0.5, count=6) assert len(packetsVCan1) == 6 sock1.close() sock0.close() threadBridge.join(timeout=3) assert not threadBridge.is_alive() = bridge and sniff setup vcan0 package forwarding sock0 = CANSocket(bustype='socketcan', channel='vcan0') sock1 = CANSocket(bustype='socketcan', channel='vcan1') bridgeStarted = threading.Event() def bridge(): global bridgeStarted bSock0 = CANSocket(bustype='socketcan', channel='vcan0') bSock1 = CANSocket(bustype='socketcan', channel='vcan1') def pnr(pkt): return pkt bSock0.timeout = 0.01 bSock1.timeout = 0.01 bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, xfrm21=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=4) bSock0.close() bSock1.close() threadBridge = threading.Thread(target=bridge) threadBridge.start() bridgeStarted.wait(timeout=1) sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) sock1.send(CAN(flags='extended', identifier=0x80, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) packetsVCan0 = sock0.sniff(timeout=0.5, count=4) assert len(packetsVCan0) == 4 sock0.close() sock1.close() threadBridge.join(timeout=3) assert not threadBridge.is_alive() =bridge and sniff setup vcan0 vcan1 package forwarding both directions sock0 = CANSocket(bustype='socketcan', channel='vcan0') sock1 = CANSocket(bustype='socketcan', channel='vcan1') bridgeStarted = threading.Event() def bridge(): global bridgeStarted bSock0 = CANSocket(bustype='socketcan', channel='vcan0') bSock1 = CANSocket(bustype='socketcan', channel='vcan1') def pnr(pkt): return pkt bSock0.timeout = 0.01 bSock1.timeout = 0.01 bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, xfrm21=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=10) bSock0.close() bSock1.close() threadBridge = threading.Thread(target=bridge) threadBridge.start() bridgeStarted.wait(timeout=1) sock0.send(CAN(flags='extended', identifier=0x25, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x20, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x25, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x25, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x20, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x30, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock1.send(CAN(flags='extended', identifier=0x40, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) sock1.send(CAN(flags='extended', identifier=0x40, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) sock1.send(CAN(flags='extended', identifier=0x80, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) sock1.send(CAN(flags='extended', identifier=0x40, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) packetsVCan0 = sock0.sniff(timeout=0.5, count=4) packetsVCan1 = sock1.sniff(timeout=0.5, count=6) assert len(packetsVCan0) == 4 assert len(packetsVCan1) == 6 sock0.close() sock1.close() threadBridge.join(timeout=3) assert not threadBridge.is_alive() =bridge and sniff setup vcan1 package change sock0 = CANSocket(bustype='socketcan', channel='vcan0') sock1 = CANSocket(bustype='socketcan', channel='vcan1', can_filters=[{'can_id': 0x10010000, 'can_mask': 0x1fffffff}]) bridgeStarted = threading.Event() def bridgeWithPackageChangeVCan0ToVCan1(): global bridgeStarted bSock0 = CANSocket(bustype='socketcan', channel='vcan0') bSock1 = CANSocket(bustype='socketcan', channel='vcan1') def pnr(pkt): pkt.data = b'\x08\x07\x06\x05\x04\x03\x02\x01' pkt.identifier = 0x10010000 return pkt bSock0.timeout = 0.01 bSock1.timeout = 0.01 bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=6) bSock0.close() bSock1.close() threadBridge = threading.Thread(target=bridgeWithPackageChangeVCan0ToVCan1) threadBridge.start() bridgeStarted.wait(timeout=1) sock0.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x10020000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x10030000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x10040000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) packetsVCan1 = sock1.sniff(timeout=0.5, count=6) assert len(packetsVCan1) == 6 sock0.close() sock1.close() threadBridge.join(timeout=3) assert not threadBridge.is_alive() =bridge and sniff setup vcan0 package change sock1 = CANSocket(bustype='socketcan', channel='vcan1') sock0 = CANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x10010000, 'can_mask': 0x1fffffff}]) bridgeStarted = threading.Event() def bridgeWithPackageChangeVCan1ToVCan0(): global bridgeStarted bSock0 = CANSocket(bustype='socketcan', channel='vcan0') bSock1 = CANSocket(bustype='socketcan', channel='vcan1') def pnr(pkt): pkt.data = b'\x08\x07\x06\x05\x04\x03\x02\x01' pkt.identifier = 0x10010000 return pkt bSock0.timeout = 0.01 bSock1.timeout = 0.01 bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm21=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=4) bSock0.close() bSock1.close() threadBridge = threading.Thread(target=bridgeWithPackageChangeVCan1ToVCan0) threadBridge.start() bridgeStarted.wait(timeout=1) sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) sock1.send(CAN(flags='extended', identifier=0x10050000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) packetsVCan0 = sock0.sniff(timeout=0.5, count=4) assert len(packetsVCan0) == 4 sock0.close() sock1.close() threadBridge.join(timeout=3) assert not threadBridge.is_alive() =bridge and sniff setup vcan0 and vcan1 package change in both directions sock0 = CANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x10010000, 'can_mask': 0x1fffffff}]) sock1 = CANSocket(bustype='socketcan', channel='vcan1', can_filters=[{'can_id': 0x10010000, 'can_mask': 0x1fffffff}]) bridgeStarted = threading.Event() def bridgeWithPackageChangeBothDirections(): global bridgeStarted bSock0 = CANSocket(bustype='socketcan', channel='vcan0') bSock1 = CANSocket(bustype='socketcan', channel='vcan1') def pnr(pkt): pkt.data = b'\x08\x07\x06\x05\x04\x03\x02\x01' pkt.identifier = 0x10010000 return pkt bSock0.timeout = 0.01 bSock1.timeout = 0.01 bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, xfrm21=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=10) bSock0.close() bSock1.close() threadBridge = threading.Thread(target=bridgeWithPackageChangeBothDirections) threadBridge.start() bridgeStarted.wait(timeout=1) sock0.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x10020000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x10030000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x10040000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) sock1.send(CAN(flags='extended', identifier=0x10050000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) packetsVCan0 = sock0.sniff(timeout=0.5, count=4) packetsVCan1 = sock1.sniff(timeout=0.5, count=6) assert len(packetsVCan0) == 4 assert len(packetsVCan1) == 6 sock0.close() sock1.close() threadBridge.join(timeout=3) assert not threadBridge.is_alive() =bridge and sniff setup vcan0 package remove sock0 = CANSocket(bustype='socketcan', channel='vcan0') sock1 = CANSocket(bustype='socketcan', channel='vcan1') bridgeStarted = threading.Event() def bridgeWithRemovePackageFromVCan0ToVCan1(): global bridgeStarted bSock0 = CANSocket(bustype='socketcan', channel='vcan0') bSock1 = CANSocket(bustype='socketcan', channel='vcan1') def pnr(pkt): if(pkt.identifier == 0x10020000): pkt = False else: pkt = pkt return pkt bSock0.timeout = 0.01 bSock1.timeout = 0.01 bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=6) bSock0.close() bSock1.close() threadBridge = threading.Thread(target=bridgeWithRemovePackageFromVCan0ToVCan1) threadBridge.start() bridgeStarted.wait(timeout=1) sock0.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x10020000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x10030000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x10040000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) packetsVCan1 = sock1.sniff(timeout=0.5, count=5) assert len(packetsVCan1) == 5 sock0.close() sock1.close() threadBridge.join(timeout=3) assert not threadBridge.is_alive() =bridge and sniff setup vcan1 package remove sock0 = CANSocket(bustype='socketcan', channel='vcan0') sock1 = CANSocket(bustype='socketcan', channel='vcan1') bridgeStarted = threading.Event() def bridgeWithRemovePackageFromVCan1ToVCan0(): global bridgeStarted bSock0 = CANSocket(bustype='socketcan', channel='vcan0') bSock1 = CANSocket(bustype='socketcan', channel='vcan1') def pnr(pkt): if(pkt.identifier == 0x10050000): pkt = False else: pkt = pkt return pkt bSock0.timeout = 0.01 bSock1.timeout = 0.01 bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm21=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=4) bSock0.close() bSock1.close() threadBridge = threading.Thread(target=bridgeWithRemovePackageFromVCan1ToVCan0) threadBridge.start() bridgeStarted.wait(timeout=1) sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) sock1.send(CAN(flags='extended', identifier=0x10050000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) packetsVCan0 = sock0.sniff(timeout=0.5, count=3) assert len(packetsVCan0) == 3 sock0.close() sock1.close() threadBridge.join(timeout=3) assert not threadBridge.is_alive() =bridge and sniff setup vcan0 and vcan1 package remove both directions sock0 = CANSocket(bustype='socketcan', channel='vcan0') sock1 = CANSocket(bustype='socketcan', channel='vcan1') bridgeStarted = threading.Event() def bridgeWithRemovePackageInBothDirections(): global bridgeStarted bSock0 = CANSocket(bustype='socketcan', channel='vcan0') bSock1 = CANSocket(bustype='socketcan', channel='vcan1') def pnrA(pkt): if(pkt.identifier == 0x10020000): pkt = False else: pkt = pkt return pkt def pnrB(pkt): if (pkt.identifier == 0x10050000): pkt = False else: pkt = pkt return pkt bSock0.timeout = 0.01 bSock1.timeout = 0.01 bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnrA, xfrm21=pnrB, timeout=0.5, started_callback=bridgeStarted.set, count=10) bSock0.close() bSock1.close() threadBridge = threading.Thread(target=bridgeWithRemovePackageInBothDirections) threadBridge.start() bridgeStarted.wait(timeout=1) sock0.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x10020000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x10030000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x10040000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) sock1.send(CAN(flags='extended', identifier=0x10050000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) packetsVCan0 = sock0.sniff(timeout=0.5, count=3) packetsVCan1 = sock1.sniff(timeout=0.5, count=5) assert len(packetsVCan0) == 3 assert len(packetsVCan1) == 5 threadBridge.join(timeout=3) assert not threadBridge.is_alive() sock0.close() sock1.close() = Delete vcan interfaces ~ needs_root linux vcan_socket if 0 != call(["sudo", "ip" ,"link", "delete", "vcan0"]): raise Exception("vcan0 could not be deleted") if 0 != call(["sudo", "ip" ,"link", "delete", "vcan1"]): raise Exception("vcan1 could not be deleted")