% Regression tests for ISOTPNativeSocket ~ automotive_comm + Configuration ~ conf = Imports with open(scapy_path("test/contrib/automotive/interface_mockup.py")) as f: exec(f.read()) = Definition of constants, utility functions and mock classes # hexadecimal to bytes convenience function dhex = bytes.fromhex + Compatibility with can-isotp linux kernel modules = Compatibility with isotpsend exit_if_no_isotp_module() message = "01 02 03 04 05 06 07 08 09 0A 0B 0C 0D 0E 0F 10 11 12 13 14" with new_can_socket0() as isocan, ISOTPSoftSocket(isocan, tx_id=0x642, rx_id=0x242) as s: p = subprocess.Popen(["isotpsend", "-s", "242", "-d", "642", iface0], stdin=subprocess.PIPE, universal_newlines=True) p.communicate(message) r = p.returncode assert r == 0 isotp = s.recv() assert isotp.data == dhex(message) = Compatibility with isotpsend - extended addresses exit_if_no_isotp_module() message = "01 02 03 04 05 06 07 08 09 0A 0B 0C 0D 0E 0F 10 11 12 13 14" with new_can_socket0() as isocan, ISOTPSoftSocket(isocan, tx_id=0x644, rx_id=0x244, ext_address=0xaa, rx_ext_address=0xee) as s: p = subprocess.Popen(["isotpsend", "-s", "244", "-d", "644", "-x", "ee:aa", iface0], stdin=subprocess.PIPE, universal_newlines=True) p.communicate(message) r = p.returncode assert r == 0 isotp = s.recv() assert isotp.data == dhex(message) = Compatibility with isotprecv exit_if_no_isotp_module() isotp = ISOTP(data=bytearray(range(1,20))) p = subprocess.Popen(["isotprecv", "-s", "243", "-d", "643", "-b", "3", iface0], stdout=subprocess.PIPE) time.sleep(0.1) with new_can_socket0() as isocan, ISOTPSoftSocket(isocan, tx_id=0x643, rx_id=0x243) as s: s.send(isotp) timer = threading.Timer(1, lambda: p.terminate() if p.poll() else p.wait()) timer.start() # Timeout the receiver after 1 second r = p.wait() assert 0 == r result = None for i in range(10): time.sleep(0.1) if p.poll() is not None: result = p.stdout.readline().decode().strip() break assert result is not None result_data = dhex(result) assert result_data == isotp.data timer.join(5) assert not timer.is_alive() = Compatibility with isotprecv - extended addresses exit_if_no_isotp_module() isotp = ISOTP(data=bytearray(range(1,20))) cmd = ["isotprecv", "-s245", "-d645", "-b3", "-x", "ee:aa", iface0] p = subprocess.Popen(cmd, stdout=subprocess.PIPE) time.sleep(0.1) # Give some time for starting reception with new_can_socket0() as isocan, ISOTPSoftSocket(isocan, tx_id=0x645, rx_id=0x245, ext_address=0xaa, rx_ext_address=0xee) as s: s.send(isotp) timer = threading.Timer(1, lambda: p.terminate() if p.poll() else p.wait()) timer.start() # Timeout the receiver after 1 second r = p.wait() assert 0 == r result = None for i in range(10): time.sleep(0.1) if p.poll() is not None: result = p.stdout.readline().decode().strip() break assert result is not None result_data = dhex(result) assert result_data == isotp.data timer.join(5) assert not timer.is_alive() = Compatibility ISOTPSoftSocket ISOTPNativeSocket various configs exit_if_no_isotp_module() message = "01 02 03 04 05 06 07 08 09 0A 0B 0C 0D 0E 0F 10 11 12 13 14" * 5 kwargs = [({"tx_id": 0x242, "rx_id": 0x642, "bs": 0, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None}, {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None}), ({"tx_id": 0x242, "rx_id": 0x642, "bs": 2, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None}, {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None}), ({"tx_id": 0x242, "rx_id": 0x642, "bs": 5, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None}, {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None}), ({"tx_id": 0x242, "rx_id": 0x642, "bs": 0, "stmin": 5, "padding": False, "ext_address": None, "rx_ext_address": None}, {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None}), ({"tx_id": 0x242, "rx_id": 0x642, "bs": 0, "stmin": 10, "padding": False, "ext_address": None, "rx_ext_address": None}, {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None}), ({"tx_id": 0x242, "rx_id": 0x642, "bs": 4, "stmin": 130, "padding": False, "ext_address": None, "rx_ext_address": None}, {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None}), ({"tx_id": 0x242, "rx_id": 0x642, "bs": 3, "stmin": 0, "padding": True, "ext_address": None, "rx_ext_address": None}, {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": True, "ext_address": None, "rx_ext_address": None}), ({"tx_id": 0x242, "rx_id": 0x642, "bs": 0, "stmin": 0, "padding": False, "ext_address": 0xfe, "rx_ext_address": None}, {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": False, "ext_address": 0xfe, "rx_ext_address": None}), ({"tx_id": 0x242, "rx_id": 0x642, "bs": 0, "stmin": 0, "padding": False, "ext_address": 0xfe, "rx_ext_address": 0xef}, {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": False, "ext_address": 0xef, "rx_ext_address": 0xfe}), ({"tx_id": 0x242, "rx_id": 0x642, "bs": 6, "stmin": 10, "padding": True, "ext_address": 0x12, "rx_ext_address": 0x23}, {"tx_id": 0x642, "rx_id": 0x242, "bs": 6, "stmin": 5, "padding": True, "ext_address": 0x23, "rx_ext_address": 0x12}), ({"tx_id": 0x242, "rx_id": 0x642, "bs": 0, "stmin": 0, "padding": True, "ext_address": 0x45, "rx_ext_address": None}, {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 40, "padding": True, "ext_address": 0x45, "rx_ext_address": None}), ({"tx_id": 0x123, "rx_id": 0x642, "bs": 1, "stmin": 1, "padding": False, "ext_address": None, "rx_ext_address": None}, {"tx_id": 0x642, "rx_id": 0x123, "bs": 1, "stmin": 1, "padding": False, "ext_address": None, "rx_ext_address": None}),] for kwargs1, kwargs2 in kwargs: print("Testing config %s, %s" % (kwargs1, kwargs2)) with NativeCANSocket(iface0) as cs: cs.sniff(timeout=0.01) with ISOTPSoftSocket(iface0, **kwargs1) as s, ISOTPNativeSocket(iface0, **kwargs2) as ns: ns.send(ISOTP(bytes.fromhex(message))) isotp = s.recv() assert isotp.data == dhex(message) ns.send(ISOTP(bytes.fromhex("00 11 22"))) isotp = s.recv() assert (isotp.data == dhex("00 11 22")) pks1 = cs.sniff(timeout=0.01) with ISOTPNativeSocket(iface0, **kwargs1) as s, ISOTPSoftSocket(iface0, **kwargs2) as ns: ns.send(ISOTP(bytes.fromhex(message))) isotp = s.recv() assert isotp.data == dhex(message) ns.send(ISOTP(bytes.fromhex("00 11 22"))) isotp = s.recv() assert (isotp.data == dhex("00 11 22")) pks2 = cs.sniff(timeout=0.01) assert len(pks1) == len(pks2) and len(pks2) > 0 for p1, p2 in zip(pks1, pks2): assert bytes(p1) == bytes(p2) + ISOTPNativeSocket tests = Create ISOTP socket exit_if_no_isotp_module() s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241) = Send single frame ISOTP message exit_if_no_isotp_module() with new_can_socket(iface0) as cans: s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241) s.send(ISOTP(data=dhex("01 02 03 04 05"))) can = cans.sniff(timeout=1, count=1)[0] assert can.identifier == 0x641 assert can.data == dhex("05 01 02 03 04 05") = Send single frame ISOTP message Test init with CANSocket exit_if_no_isotp_module() cans = CANSocket(iface0) s = ISOTPNativeSocket(cans, tx_id=0x641, rx_id=0x241) s.send(ISOTP(data=dhex("01 02 03 04 05"))) can = cans.sniff(timeout=1, count=1)[0] assert can.identifier == 0x641 assert can.data == dhex("05 01 02 03 04 05") cans.close() = Test init with wrong type exit_if_no_isotp_module() exception_catched = False try: s = ISOTPNativeSocket(42, tx_id=0x641, rx_id=0x241) except Scapy_Exception: exception_catched = True assert exception_catched = Send two-frame ISOTP message exit_if_no_isotp_module() evt = threading.Event() def acker(): with new_can_socket(iface0) as cans: evt.set() can = cans.sniff(timeout=1, count=1)[0] cans.send(CAN(identifier = 0x241, data=dhex("30 00 00"))) with new_can_socket(iface0) as cans: t = Thread(target=acker) t.start() s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241) evt.wait(timeout=5) s.send(ISOTP(data=dhex("01 02 03 04 05 06 07 08"))) can = cans.sniff(timeout=1, count=1)[0] assert can.identifier == 0x641 assert can.data == dhex("10 08 01 02 03 04 05 06") can = cans.sniff(timeout=1, count=1)[0] assert can.identifier == 0x241 assert can.data == dhex("30 00 00") can = cans.sniff(timeout=1, count=1)[0] assert can.identifier == 0x641 assert can.data == dhex("21 07 08") t.join(timeout=5) assert not t.is_alive() = Send a single frame ISOTP message with padding exit_if_no_isotp_module() s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, padding=True) with new_can_socket(iface0) as cans: s.send(ISOTP(data=dhex("01"))) can = cans.sniff(timeout=1, count=1)[0] assert can.length == 8 = Send a two-frame ISOTP message with padding exit_if_no_isotp_module() acker_ready = threading.Event() def acker(): with new_can_socket(iface0) as acks: acker_ready.set() can = acks.sniff(timeout=1, count=1)[0] acks.send(CAN(identifier = 0x241, data=dhex("30 00 00"))) with new_can_socket(iface0) as cans: thread = Thread(target=acker) thread.start() acker_ready.wait(timeout=5) s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, padding=True) s.send(ISOTP(data=dhex("01 02 03 04 05 06 07 08"))) can = cans.sniff(timeout=1, count=1)[0] assert can.identifier == 0x641 assert can.data == dhex("10 08 01 02 03 04 05 06") can = cans.sniff(timeout=1, count=1)[0] assert can.identifier == 0x241 assert can.data == dhex("30 00 00") can = cans.sniff(timeout=1, count=1)[0] assert can.identifier == 0x641 assert can.data == dhex("21 07 08 CC CC CC CC CC") thread.join(5) assert not thread.is_alive() = Receive a padded single frame ISOTP message with padding disabled exit_if_no_isotp_module() s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, padding=False) with new_can_socket(iface0) as cans: cans.send(CAN(identifier=0x241, data=dhex("02 05 06 00 00 00 00 00"))) res = s.recv() assert res.data == dhex("05 06") = Receive a padded single frame ISOTP message with padding enabled exit_if_no_isotp_module() s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, padding=True) with new_can_socket(iface0) as cans: cans.send(CAN(identifier=0x241, data=dhex("02 05 06 00 00 00 00 00"))) res = s.recv() assert res.data == dhex("05 06") = Receive a non-padded single frame ISOTP message with padding enabled exit_if_no_isotp_module() s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, padding=True) with new_can_socket(iface0) as cans: cans.send(CAN(identifier=0x241, data=dhex("02 05 06"))) res = s.recv() assert res.data == dhex("05 06") = Receive a padded two-frame ISOTP message with padding enabled exit_if_no_isotp_module() s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, padding=True) with new_can_socket(iface0) as cans: cans.send(CAN(identifier=0x241, data=dhex("10 09 01 02 03 04 05 06"))) cans.send(CAN(identifier=0x241, data=dhex("21 07 08 09 00 00 00 00"))) res = s.recv() assert res.data == dhex("01 02 03 04 05 06 07 08 09") = Receive a padded two-frame ISOTP message with padding disabled exit_if_no_isotp_module() s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, padding=False) with new_can_socket(iface0) as cans: cans.send(CAN(identifier=0x241, data=dhex("10 09 01 02 03 04 05 06"))) cans.send(CAN(identifier=0x241, data=dhex("21 07 08 09 00 00 00 00"))) res = s.recv() assert res.data == dhex("01 02 03 04 05 06 07 08 09") = Receive a single frame ISOTP message exit_if_no_isotp_module() s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241) with new_can_socket(iface0) as cans: cans.send(CAN(identifier = 0x241, data = dhex("05 01 02 03 04 05"))) isotp = s.recv() assert isotp.data == dhex("01 02 03 04 05") assert isotp.tx_id == 0x641 assert isotp.rx_id == 0x241 assert isotp.ext_address == None assert isotp.rx_ext_address == None = Receive a single frame ISOTP message, with extended addressing exit_if_no_isotp_module() s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, ext_address=0xc0, rx_ext_address=0xea) with new_can_socket(iface0) as cans: cans.send(CAN(identifier = 0x241, data = dhex("EA 05 01 02 03 04 05"))) isotp = s.recv() assert isotp.data == dhex("01 02 03 04 05") assert isotp.tx_id == 0x641 assert isotp.rx_id == 0x241 assert isotp.ext_address == 0xc0 assert isotp.rx_ext_address == 0xea = Receive a two-frame ISOTP message exit_if_no_isotp_module() s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241) with new_can_socket(iface0) as cans: cans.send(CAN(identifier = 0x241, data = dhex("10 0B 01 02 03 04 05 06"))) cans.send(CAN(identifier = 0x241, data = dhex("21 07 08 09 10 11"))) isotp = s.recv() assert isotp.data == dhex("01 02 03 04 05 06 07 08 09 10 11") = Receive a two-frame ISOTP message and test python with statement exit_if_no_isotp_module() with ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241) as s: with new_can_socket(iface0) as cans: cans.send(CAN(identifier = 0x241, data = dhex("10 0B 01 02 03 04 05 06"))) cans.send(CAN(identifier = 0x241, data = dhex("21 07 08 09 10 11"))) isotp = s.recv() assert isotp.data == dhex("01 02 03 04 05 06 07 08 09 10 11") = Send single CANFD frame ISOTP message exit_if_no_isotp_module() with new_can_socket(iface0, fd=True) as cans: s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, fd=True) s.send(ISOTP(data=dhex("01 02 03 04 05 06 07 08 09"))) can = cans.sniff(timeout=1, count=1)[0] assert can.identifier == 0x641 assert can.data == dhex("09 01 02 03 04 05 06 07 08 09") = ISOTP Socket sr1 test exit_if_no_isotp_module() txSock = ISOTPNativeSocket(iface0, tx_id=0x123, rx_id=0x321, basecls=ISOTP) txmsg = ISOTP(b'\x11\x22\x33') rx2 = None receiver_up = Event() def sender(): global receiver_up receiver_up.wait(timeout=5) global txmsg global rx2 rx2 = txSock.sr1(txmsg, timeout=1, verbose=True) def receiver(): global receiver_up with new_can_socket(iface0) as cans: rx = cans.sniff(timeout=1, count=1, started_callback=receiver_up.set)[0] cans.send(CAN(identifier=0x321, length=4, data=b'\x03\x7f\x22\x33')) expectedrx = CAN(identifier=0x123, length=4, data=b'\x03\x11\x22\x33') assert rx.length == expectedrx.length assert rx.data == expectedrx.data assert rx.identifier == expectedrx.identifier txThread = threading.Thread(target=sender) txThread.start() receiver() txThread.join(timeout=5) assert not txThread.is_alive() assert rx2 is not None assert rx2 == ISOTP(b'\x7f\x22\x33') assert rx2.answers(txmsg) = ISOTP Socket sr1 and ISOTP test exit_if_no_isotp_module() txSock = ISOTPNativeSocket(iface0, 0x123, 0x321, basecls=ISOTP) rxSock = ISOTPNativeSocket(iface0, 0x321, 0x123, basecls=ISOTP) msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33') rx2 = None receiver_up = Event() def sender(): receiver_up.wait(timeout=5) global rx2 rx2 = txSock.sr1(msg, timeout=1, verbose=True) def receiver(): global rx receiver_up.set() rx = rxSock.recv() rxSock.send(msg) txThread = threading.Thread(target=sender) txThread.start() receiver() txThread.join(timeout=5) assert not txThread.is_alive() assert rx == msg assert rxSock.send(msg) assert rx2 is not None assert rx2 == msg = ISOTP Socket sr1 and ISOTP test vice versa exit_if_no_isotp_module() rxSock = ISOTPNativeSocket(iface0, 0x321, 0x123, basecls=ISOTP) txSock = ISOTPNativeSocket(iface0, 0x123, 0x321, basecls=ISOTP) msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33') receiver_up = Event() def receiver(): global rx2, sent rx2 = rxSock.sniff(count=1, timeout=1, started_callback=receiver_up.set) sent = rxSock.send(msg) def sender(): global rx receiver_up.wait(timeout=5) rx = txSock.sr1(msg, timeout=1,verbose=True) rx2 = None sent = False rxThread = threading.Thread(target=receiver) rxThread.start() sender() rxThread.join(timeout=5) assert not rxThread.is_alive() assert rx == msg assert rx2[0] == msg assert sent = ISOTP Socket sniff exit_if_no_isotp_module() rxSock = ISOTPNativeSocket(iface0, 0x321, 0x123, basecls=ISOTP) txSock = ISOTPNativeSocket(iface0, 0x123, 0x321, basecls=ISOTP) succ = False receiver_up = Event() def receiver(): rx = rxSock.sniff(count=5, timeout=1, started_callback=receiver_up.set) msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33') msg.data += b'0' assert rx[0] == msg msg.data += b'1' assert rx[1] == msg msg.data += b'2' assert rx[2] == msg msg.data += b'3' assert rx[3] == msg msg.data += b'4' assert rx[4] == msg global succ succ = True def sender(): receiver_up.wait(timeout=5) msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33') msg.data += b'0' assert txSock.send(msg) msg.data += b'1' assert txSock.send(msg) msg.data += b'2' assert txSock.send(msg) msg.data += b'3' assert txSock.send(msg) msg.data += b'4' assert txSock.send(msg) rxThread = threading.Thread(target=receiver) rxThread.start() sender() rxThread.join(timeout=5) assert not rxThread.is_alive() assert succ + ISOTPNativeSocket MITM attack tests ~ vcan_socket needs_root linux = bridge and sniff with isotp native sockets set up vcan0 and vcan1 for package forwarding vcan1 exit_if_no_isotp_module() isoTpSocket0 = ISOTPNativeSocket(iface0, tx_id=0x241, rx_id=0x641) isoTpSocket1 = ISOTPNativeSocket(iface1, tx_id=0x641, rx_id=0x241) bSocket0 = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241) bSocket1 = ISOTPNativeSocket(iface1, tx_id=0x241, rx_id=0x641) bridgeStarted = threading.Event() def bridge(): global bridgeStarted def forwarding(pkt): return pkt bridge_and_sniff(if1=bSocket0, if2=bSocket1, xfrm12=forwarding, xfrm21=forwarding, timeout=2, count=1, started_callback=bridgeStarted.set) bSocket0.close() bSocket1.close() global bSucc bSucc = True bSucc = False threadBridge = threading.Thread(target=bridge) threadBridge.start() bridgeStarted.wait(timeout=5) isoTpSocket0.send(ISOTP(b'Request')) packetsVCan1 = isoTpSocket1.sniff(timeout=0.5, count=1) assert len(packetsVCan1) == 1 isoTpSocket0.close() isoTpSocket1.close() threadBridge.join(timeout=5) assert not threadBridge.is_alive() assert bSucc = bridge and sniff with isotp native sockets set up vcan0 and vcan1 for package change to vcan1 exit_if_no_isotp_module() isoTpSocket0 = ISOTPNativeSocket(iface0, tx_id=0x241, rx_id=0x641) isoTpSocket1 = ISOTPNativeSocket(iface1, tx_id=0x641, rx_id=0x241) bSocket0 = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241) bSocket1 = ISOTPNativeSocket(iface1, tx_id=0x241, rx_id=0x641) bSucc = False bridgeStarted = threading.Event() def bridge(): global bridgeStarted global bSucc def forwarding(pkt): pkt.data = 'changed' return pkt bridge_and_sniff(if1=bSocket0, if2=bSocket1, xfrm12=forwarding, xfrm21=forwarding, timeout=0.5, started_callback=bridgeStarted.set) bSocket0.close() bSocket1.close() bSucc = True threadBridge = threading.Thread(target=bridge) threadBridge.start() bridgeStarted.wait(timeout=5) isoTpSocket0.send(ISOTP(b'Request')) packetsVCan1 = isoTpSocket1.sniff(timeout=0.5, count=1) packetsVCan1[0].data = b'changed' assert len(packetsVCan1) == 1 isoTpSocket0.close() isoTpSocket1.close() threadBridge.join(timeout=5) assert not threadBridge.is_alive() assert bSucc = bridge and sniff with isotp native sockets set up vcan0 and vcan1 for package forwarding in both directions exit_if_no_isotp_module() bSucc = False isoTpSocket0 = ISOTPNativeSocket(iface0, tx_id=0x241, rx_id=0x641) isoTpSocket1 = ISOTPNativeSocket(iface1, tx_id=0x641, rx_id=0x241) bSocket0 = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241) bSocket1 = ISOTPNativeSocket(iface1, tx_id=0x241, rx_id=0x641) bridgeStarted = threading.Event() def bridge(): global bridgeStarted global bSucc def forwarding(pkt): return pkt bridge_and_sniff(if1=bSocket0, if2=bSocket1, xfrm12=forwarding, xfrm21=forwarding, timeout=0.5, started_callback=bridgeStarted.set, count=2) bSocket0.close() bSocket1.close() bSucc = True threadBridge = threading.Thread(target=bridge) threadBridge.start() bridgeStarted.wait(timeout=5) packetVcan0 = ISOTP(b'RequestVcan0') packetVcan1 = ISOTP(b'RequestVcan1') isoTpSocket0.send(packetVcan0) isoTpSocket1.send(packetVcan1) packetsVCan0 = isoTpSocket0.sniff(timeout=0.5, count=1) packetsVCan1 = isoTpSocket1.sniff(timeout=0.5, count=1) len(packetsVCan0) == 1 len(packetsVCan1) == 1 isoTpSocket0.close() isoTpSocket1.close() threadBridge.join(timeout=5) assert not threadBridge.is_alive() assert bSucc = bridge and sniff with isotp native sockets set up vcan0 and vcan1 for package change in both directions exit_if_no_isotp_module() bSucc = False isoTpSocket0 = ISOTPNativeSocket(iface0, tx_id=0x241, rx_id=0x641) isoTpSocket1 = ISOTPNativeSocket(iface1, tx_id=0x641, rx_id=0x241) bSocket0 = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241) bSocket1 = ISOTPNativeSocket(iface1, tx_id=0x241, rx_id=0x641) bridgeStarted = threading.Event() def bridge(): global bridgeStarted global bSucc def forwarding(pkt): pkt.data = 'changed' return pkt bridge_and_sniff(if1=bSocket0, if2=bSocket1, xfrm12=forwarding, xfrm21=forwarding, timeout=0.5, started_callback=bridgeStarted.set, count=2) bSocket0.close() bSocket1.close() bSucc = True threadBridge = threading.Thread(target=bridge) threadBridge.start() bridgeStarted.wait(timeout=5) packetVcan0 = ISOTP(b'RequestVcan0') packetVcan1 = ISOTP(b'RequestVcan1') isoTpSocket0.send(packetVcan0) isoTpSocket1.send(packetVcan1) packetsVCan0 = isoTpSocket0.sniff(timeout=0.5, count=1) packetsVCan1 = isoTpSocket1.sniff(timeout=0.5, count=1) packetsVCan0[0].data = b'changed' assert len(packetsVCan0) == 1 packetsVCan1[0].data = b'changed' assert len(packetsVCan1) == 1 isoTpSocket0.close() isoTpSocket1.close() threadBridge.join(timeout=5) assert not threadBridge.is_alive() assert bSucc + Cleanup = Cleanup reference to ISOTPSoftSocket to let the thread end s = None = Delete vcan interfaces assert cleanup_interfaces()