% Regression tests for ISOTPSoftSocket ~ automotive_comm + Configuration ~ conf = Imports import time from io import BytesIO from scapy.layers.can import * from scapy.contrib.isotp import * from scapy.contrib.isotp.isotp_soft_socket import TimeoutScheduler from test.testsocket import TestSocket, cleanup_testsockets with open(scapy_path("test/contrib/automotive/interface_mockup.py")) as f: exec(f.read()) = Redirect logging import logging from scapy.error import log_runtime from io import StringIO log_stream = StringIO() handler = logging.StreamHandler(log_stream) log_runtime.addHandler(handler) log_isotp.addHandler(handler) = Definition of utility functions # hexadecimal to bytes convenience function dhex = bytes.fromhex + Test sniffer = Test sniffer with multiple frames test_frames = [ (0x241, "EA 10 28 01 02 03 04 05"), (0x641, "EA 30 03 00" ), (0x241, "EA 21 06 07 08 09 0A 0B"), (0x241, "EA 22 0C 0D 0E 0F 10 11"), (0x241, "EA 23 12 13 14 15 16 17"), (0x641, "EA 30 03 00" ), (0x241, "EA 24 18 19 1A 1B 1C 1D"), (0x241, "EA 25 1E 1F 20 21 22 23"), (0x241, "EA 26 24 25 26 27 28" ), ] with TestSocket(CAN) as s, TestSocket(CAN) as tx_sock: s.pair(tx_sock) for f in test_frames: tx_sock.send(CAN(identifier=f[0], data=dhex(f[1]))) sniffed = sniff(opened_socket=s, session=ISOTPSession, timeout=1, count=1) assert sniffed[0]['ISOTP'].data == bytearray(range(1, 0x29)) assert sniffed[0]['ISOTP'].tx_id == 0x641 assert sniffed[0]['ISOTP'].ext_address == 0xEA assert sniffed[0]['ISOTP'].rx_id == 0x241 assert sniffed[0]['ISOTP'].rx_ext_address == 0xEA + ISOTPSoftSocket tests = CAN socket FD ~ not_pypy needs_root linux vcan_socket with ISOTPSoftSocket(iface0, tx_id=0x641, rx_id=0x241, fd=True) as s: assert s.impl.can_socket.fd == True = CAN socket non-FD ~ not_pypy needs_root linux vcan_socket with ISOTPSoftSocket(iface0, tx_id=0x641, rx_id=0x241) as s: assert s.impl.can_socket.fd == False = Single-frame receive with TestSocket(CAN) as cans, TestSocket(CAN) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241) as s: cans.pair(stim) stim.send(CAN(identifier=0x241, data=dhex("05 01 02 03 04 05"))) pkts = s.sniff(count=1, timeout=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") msg = pkts[0] assert msg.data == dhex("01 02 03 04 05") = Single-frame receive FD with TestSocket(CANFD) as cans, TestSocket(CANFD) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241, fd=True) as s: pl_sizes_testings = [1, 5, 7, 8, 15, 20, 35, 40, 46, 62] data_str = "" data_str_offset = 0 cans.pair(stim) for size_to_send in pl_sizes_testings: if size_to_send > 7: data_str = "00 {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(size_to_send)])) data_str_offset = 6 else: data_str = "{} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(size_to_send)])) data_str_offset = 2 stim.send(CANFD(identifier=0x241, data=dhex(data_str))) pkts = s.sniff(count=1, timeout=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") msg = pkts[0] assert msg.data == dhex(data_str[data_str_offset:]) = Single-frame send with TestSocket(CAN) as cans, TestSocket(CAN) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241) as s: cans.pair(stim) s.send(ISOTP(dhex("01 02 03 04 05"))) pkts = stim.sniff(count=1, timeout=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") msg = pkts[0] assert msg.data == dhex("05 01 02 03 04 05") = Single-frame send FD with TestSocket(CANFD) as cans, TestSocket(CANFD) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241, fd=True) as s: pl_sizes_testings = [1, 5, 7, 8, 15, 20, 35, 40, 46, 62] data_str = "" data_str_offset = 0 cans.pair(stim) for size_to_send in pl_sizes_testings: if size_to_send > 7: data_str = "00 {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(size_to_send)])) data_str_offset = 6 else: data_str = "{} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(size_to_send)])) data_str_offset = 2 s.send(ISOTP(dhex(data_str[data_str_offset:]))) pkts = stim.sniff(count=1, timeout=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") msg = pkts[0] assert msg.data == dhex(data_str) = Two frame receive with TestSocket(CAN) as cans, TestSocket(CAN) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241) as s: cans.pair(stim) stim.send(CAN(identifier=0x241, data=dhex("10 09 01 02 03 04 05 06"))) pkts = stim.sniff(count=1, timeout=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") c = pkts[0] assert (c.data == dhex("30 00 00")) stim.send(CAN(identifier=0x241, data=dhex("21 07 08 09 00 00 00 00"))) pkts = s.sniff(count=1, timeout=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") msg = pkts[0] assert msg.data == dhex("01 02 03 04 05 06 07 08 09") = Two frame receive FD with TestSocket(CANFD) as cans, TestSocket(CANFD) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241, fd=True) as s: cans.pair(stim) stim.send(CANFD(identifier=0x241, data=dhex("10 09 01 02 03 04 05 06 07 08 09 0A 0B"))) pkts = stim.sniff(count=1, timeout=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") c = pkts[0] assert (c.data == dhex("30 00 00")) stim.send(CANFD(identifier=0x241, data=dhex("21 07 08 09 00 00 00 00"))) pkts = s.sniff(count=1, timeout=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") msg = pkts[0] assert msg.data == dhex("01 02 03 04 05 06 07 08 09") = 20000 bytes receive def test(): with TestSocket(CAN) as cans, TestSocket(CAN) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241) as s: cans.pair(stim) data = dhex("01 02 03 04 05") * 4000 cf = ISOTP(data, rx_id=0x241).fragment() ff = cf.pop(0) cs = stim.sniff(count=1, timeout=3, started_callback=lambda: stim.send(ff)) assert len(cs) c = cs[0] assert (c.data == dhex("30 00 00")) for f in cf: _ = stim.send(f) msgs = s.sniff(count=1, timeout=30) print(msgs) msg = msgs[0] assert msg.data == data test() = 20000 bytes send def test(): with TestSocket(CAN) as cans, TestSocket(CAN) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241) as s: cans.pair(stim) data = dhex("01 02 03 04 05")*4000 msg = ISOTP(data, rx_id=0x641) fragments = msg.fragment() ack = CAN(identifier=0x241, data=dhex("30 00 00")) ff = stim.sniff(timeout=1, count=1, started_callback=lambda:s.send(msg)) assert len(ff) == 1 cfs = stim.sniff(timeout=20, count=len(fragments) - 1, started_callback=lambda: stim.send(ack)) for fragment, cf in zip(fragments, ff + cfs): assert (bytes(fragment) == bytes(cf)) test() = 20000 bytes send FD def testfd(): with TestSocket(CANFD) as cans, TestSocket(CANFD) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241, fd=True) as s: cans.pair(stim) data = dhex("01 02 03 04 05")*4006 msg = ISOTP(data, rx_id=0x641) fragments = msg.fragment(fd=True) ack = CANFD(identifier=0x241, data=dhex("30 00 00")) ff = stim.sniff(timeout=1, count=1, started_callback=lambda:s.send(msg)) assert len(ff) == 1 cfs = stim.sniff(timeout=20, count=len(fragments) - 1, started_callback=lambda: stim.send(ack)) for fragment, cf in zip(fragments, ff + cfs): assert (bytes(fragment) == bytes(cf)) testfd() = Close ISOTPSoftSocket with TestSocket(CAN) as cans, TestSocket(CAN) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241) as s: cans.pair(stim) s.close() s = None = Test on_recv function with single frame with ISOTPSoftSocket(TestSocket(CAN), tx_id=0x641, rx_id=0x241) as s: s.ins.on_recv(CAN(identifier=0x241, data=dhex("05 01 02 03 04 05"))) msg, ts = s.ins.rx_queue.recv() assert msg == dhex("01 02 03 04 05") = Test on_recv function with single frame FD with ISOTPSoftSocket(TestSocket(CANFD), tx_id=0x641, rx_id=0x241, fd=True) as s: pl_sizes_testings = [1, 5, 7, 8, 15, 20, 35, 40, 46, 62] data_str = "" data_str_offset = 0 for size_to_send in pl_sizes_testings: if size_to_send > 7: data_str = "00 {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(size_to_send)])) data_str_offset = 6 else: data_str = "{} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(size_to_send)])) data_str_offset = 2 s.ins.on_recv(CANFD(identifier=0x241, data=dhex(data_str))) msg, ts = s.ins.rx_queue.recv() assert msg == dhex(data_str[data_str_offset:]) = Test on_recv function with empty frame with ISOTPSoftSocket(TestSocket(CAN), tx_id=0x641, rx_id=0x241) as s: s.ins.on_recv(CAN(identifier=0x241, data=b"")) assert s.ins.rx_queue.empty() = Test on_recv function with single frame and extended addressing with ISOTPSoftSocket(TestSocket(CAN), tx_id=0x641, rx_id=0x241, rx_ext_address=0xea) as s: cf = CAN(identifier=0x241, data=dhex("EA 05 01 02 03 04 05")) s.ins.on_recv(cf) msg, ts = s.ins.rx_queue.recv() assert msg == dhex("01 02 03 04 05") assert ts == cf.time = Test on_recv function with single frame and extended addressing FD with ISOTPSoftSocket(TestSocket(CANFD), tx_id=0x641, rx_id=0x241, rx_ext_address=0xea, fd=True) as s: pl_sizes_testings = [1, 5, 7, 8, 15, 20, 35, 40, 46, 62] data_str = "" data_str_offset = 0 for size_to_send in pl_sizes_testings: if size_to_send > 7: data_str = "EA 00 {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(size_to_send)])) data_str_offset = 8 else: data_str = "EA {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(size_to_send)])) data_str_offset = 5 cf = CANFD(identifier=0x241, data=dhex(data_str)) s.ins.on_recv(cf) msg, ts = s.ins.rx_queue.recv() assert msg == dhex(data_str[data_str_offset:]) assert ts == cf.time = CF is sent when first frame is received cans = TestSocket(CAN) can_out = TestSocket(CAN) cans.pair(can_out) with ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241) as s: s.ins.on_recv(CAN(identifier=0x241, data=dhex("10 20 01 02 03 04 05 06"))) can = can_out.sniff(timeout=1, count=1)[0] assert can.identifier == 0x641 assert can.data == dhex("30 00 00") cans.close() can_out.close() + Testing ISOTPSoftSocket with an actual CAN socket = Verify that packets are not lost if they arrive before the sniff() is called with TestSocket(CAN) as ss, TestSocket(CAN) as sr: ss.pair(sr) tx_func = lambda: ss.send(CAN(identifier=0x111, data=b"\x01\x23\x45\x67")) p = sr.sniff(count=1, timeout=0.2, started_callback=tx_func) assert len(p)==1 tx_func = lambda: ss.send(CAN(identifier=0x111, data=b"\x89\xab\xcd\xef")) p = sr.sniff(count=1, timeout=0.2, started_callback=tx_func) assert len(p)==1 = Send single frame ISOTP message, using send with TestSocket(CAN) as isocan, \ ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, \ TestSocket(CAN) as cans: cans.pair(isocan) can = cans.sniff(timeout=2, count=1, started_callback=lambda: s.send(ISOTP(data=dhex("01 02 03 04 05")))) assert can[0].identifier == 0x641 assert can[0].data == dhex("05 01 02 03 04 05") = Send many single frame ISOTP messages, using send with TestSocket(CAN) as isocan, \ ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, \ TestSocket(CAN) as cans: cans.pair(isocan) for i in range(100): data = dhex("01 02 03 04 05") + struct.pack("B", i) expected = struct.pack("B", len(data)) + data can = cans.sniff(timeout=4, count=1, started_callback=lambda: s.send(ISOTP(data=data))) assert can[0].identifier == 0x641 print(can[0].data, data) assert can[0].data == expected = Send two-frame ISOTP message, using send with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans: cans.pair(isocan) can = cans.sniff(timeout=1, count=1, started_callback=lambda: s.send(ISOTP(data=dhex("01 02 03 04 05 06 07 08")))) assert can[0].identifier == 0x641 assert can[0].data == dhex("10 08 01 02 03 04 05 06") can = cans.sniff(timeout=1, count=1, started_callback=lambda: cans.send(CAN(identifier = 0x241, data=dhex("30 00 00")))) assert can[0].identifier == 0x641 assert can[0].data == dhex("21 07 08") = Send two-frame ISOTP message, using send FD with TestSocket(CANFD) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, fd=True) as s, TestSocket(CANFD) as cans: size_to_send = 100 max_pl_size = 62 data_str = "{}".format(" ".join(["%02X" % x for x in range(size_to_send)])) cans.pair(isocan) can = cans.sniff(timeout=1, count=1, started_callback=lambda: s.send(dhex(data_str))) assert can[0].identifier == 0x641 assert can[0].data == dhex("10 {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(max_pl_size)]))) can = cans.sniff(timeout=1, count=1, started_callback=lambda: cans.send(CANFD(identifier = 0x241, data=dhex("30 00 00")))) assert can[0].identifier == 0x641 assert can[0].data == dhex("21 {}".format(" ".join(["%02X" % x for x in range(max_pl_size, size_to_send)]))) = Send single frame ISOTP message with TestSocket(CAN) as cans, TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s: cans.pair(isocan) s.send(ISOTP(data=dhex("01 02 03 04 05"))) can = cans.sniff(timeout=1, count=1) assert can[0].identifier == 0x641 assert can[0].data == dhex("05 01 02 03 04 05") = Send two-frame ISOTP message acks = TestSocket(CAN) acker_ready = threading.Event() def acker(): acker_ready.set() can_pkt = acks.sniff(timeout=1, count=1) can = can_pkt[0] acks.send(CAN(identifier = 0x241, data=dhex("30 00 00"))) thread = Thread(target=acker) thread.start() acker_ready.wait(timeout=5) with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans: cans.pair(isocan) cans.pair(acks) isocan.pair(acks) s.send(ISOTP(data=dhex("01 02 03 04 05 06 07 08"))) pkts = cans.sniff(timeout=1, count=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") can = pkts[0] assert can.identifier == 0x641 assert can.data == dhex("10 08 01 02 03 04 05 06") pkts = cans.sniff(timeout=1, count=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") can = pkts[0] assert can.identifier == 0x241 assert can.data == dhex("30 00 00") pkts = cans.sniff(timeout=1, count=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") can = pkts[0] assert can.identifier == 0x641 assert can.data == dhex("21 07 08") thread.join(15) acks.close() assert not thread.is_alive() = Send two-frame ISOTP message FD acks = TestSocket(CANFD) acker_ready = threading.Event() def acker(): acker_ready.set() can_pkt = acks.sniff(timeout=1, count=1) can = can_pkt[0] acks.send(CANFD(identifier = 0x241, data=dhex("30 00 00"))) thread = Thread(target=acker) thread.start() acker_ready.wait(timeout=5) with TestSocket(CANFD) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, fd=True) as s, TestSocket(CANFD) as cans: size_to_send = 123 max_pl_size = 62 data_str = "{}".format(" ".join(["%02X" % x for x in range(size_to_send)])) cans.pair(isocan) cans.pair(acks) isocan.pair(acks) s.send(dhex(data_str)) pkts = cans.sniff(timeout=1, count=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") can = pkts[0] assert can.identifier == 0x641 assert can.data == dhex("10 {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(max_pl_size)]))) pkts = cans.sniff(timeout=1, count=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") can = pkts[0] assert can.identifier == 0x241 assert can.data == dhex("30 00 00") pkts = cans.sniff(timeout=1, count=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") can = pkts[0] assert can.identifier == 0x641 assert can.data == dhex("21 {}".format(" ".join(["%02X" % x for x in range(max_pl_size, size_to_send)]))) thread.join(15) acks.close() assert not thread.is_alive() = Send two-frame ISOTP message with bs acks = TestSocket(CAN) acker_ready = threading.Event() def acker(): acker_ready.set() can_pkt = acks.sniff(timeout=1, count=1) acks.send(CAN(identifier = 0x241, data=dhex("30 20 00"))) thread = Thread(target=acker) thread.start() acker_ready.wait(timeout=5) with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans: cans.pair(isocan) cans.pair(acks) isocan.pair(acks) s.send(ISOTP(data=dhex("01 02 03 04 05 06 07 08"))) pkts = cans.sniff(timeout=1, count=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") can = pkts[0] assert can.identifier == 0x641 assert can.data == dhex("10 08 01 02 03 04 05 06") pkts = cans.sniff(timeout=1, count=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") can = pkts[0] assert can.identifier == 0x241 assert can.data == dhex("30 20 00") pkts = cans.sniff(timeout=1, count=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") can = pkts[0] assert can.identifier == 0x641 assert can.data == dhex("21 07 08") thread.join(15) acks.close() assert not thread.is_alive() = Send two-frame ISOTP message with bs FD acks = TestSocket(CANFD) acker_ready = threading.Event() def acker(): acker_ready.set() can_pkt = acks.sniff(timeout=1, count=1) acks.send(CANFD(identifier = 0x241, data=dhex("30 20 00"))) thread = Thread(target=acker) thread.start() acker_ready.wait(timeout=5) with TestSocket(CANFD) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, fd=True) as s, TestSocket(CANFD) as cans: size_to_send = 124 max_pl_size = 62 data_str = "{}".format(" ".join(["%02X" % x for x in range(size_to_send)])) cans.pair(isocan) cans.pair(acks) isocan.pair(acks) s.send(ISOTP(data=dhex(data_str))) pkts = cans.sniff(timeout=1, count=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") can = pkts[0] assert can.identifier == 0x641 assert can.data == dhex("10 {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(max_pl_size)]))) pkts = cans.sniff(timeout=1, count=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") can = pkts[0] assert can.identifier == 0x241 assert can.data == dhex("30 20 00") pkts = cans.sniff(timeout=1, count=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") can = pkts[0] assert can.identifier == 0x641 assert can.data == dhex("21 {}".format(" ".join(["%02X" % x for x in range(max_pl_size, size_to_send)]))) thread.join(15) acks.close() assert not thread.is_alive() = Send two-frame ISOTP message with ST acks = TestSocket(CAN) acker_ready = threading.Event() def acker(): acker_ready.set() acks.sniff(timeout=1, count=1) acks.send(CAN(identifier = 0x241, data=dhex("30 00 10"))) thread = Thread(target=acker) thread.start() acker_ready.wait(timeout=5) with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans: cans.pair(isocan) cans.pair(acks) isocan.pair(acks) s.send(ISOTP(data=dhex("01 02 03 04 05 06 07 08"))) pkts = cans.sniff(timeout=1, count=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") can = pkts[0] assert can.identifier == 0x641 assert can.data == dhex("10 08 01 02 03 04 05 06") pkts = cans.sniff(timeout=1, count=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") can = pkts[0] assert can.identifier == 0x241 assert can.data == dhex("30 00 10") pkts = cans.sniff(timeout=1, count=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") can = pkts[0] assert can.identifier == 0x641 assert can.data == dhex("21 07 08") thread.join(15) acks.close() assert not thread.is_alive() = Send two-frame ISOTP message with ST FD acks = TestSocket(CANFD) acker_ready = threading.Event() def acker(): acker_ready.set() acks.sniff(timeout=1, count=1) acks.send(CANFD(identifier = 0x241, data=dhex("30 00 10"))) thread = Thread(target=acker) thread.start() acker_ready.wait(timeout=5) with TestSocket(CANFD) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, fd=True) as s, TestSocket(CANFD) as cans: size_to_send = 124 max_pl_size = 62 data_str = "{}".format(" ".join(["%02X" % x for x in range(size_to_send)])) cans.pair(isocan) cans.pair(acks) isocan.pair(acks) s.send(dhex(data_str)) pkts = cans.sniff(timeout=1, count=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") can = pkts[0] assert can.identifier == 0x641 assert can.data == dhex("10 {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(max_pl_size)]))) pkts = cans.sniff(timeout=1, count=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") can = pkts[0] assert can.identifier == 0x241 assert can.data == dhex("30 00 10") pkts = cans.sniff(timeout=1, count=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") can = pkts[0] assert can.identifier == 0x641 assert can.data == dhex("21 {}".format(" ".join(["%02X" % x for x in range(max_pl_size, size_to_send)]))) thread.join(15) acks.close() assert not thread.is_alive() = Receive a single frame ISOTP message with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans: cans.pair(isocan) cans.send(CAN(identifier = 0x241, data = dhex("05 01 02 03 04 05"))) pkts = s.sniff(count=1, timeout=2) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") isotp = pkts[0] assert isotp.data == dhex("01 02 03 04 05") assert isotp.tx_id == 0x641 assert isotp.rx_id == 0x241 assert isotp.ext_address == None assert isotp.rx_ext_address == None = Receive a single frame ISOTP message, with extended addressing with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, ext_address=0xc0, rx_ext_address=0xea) as s, TestSocket(CAN) as cans: cans.pair(isocan) cans.send(CAN(identifier = 0x241, data = dhex("EA 05 01 02 03 04 05"))) pkts = s.sniff(count=1, timeout=2) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") isotp = pkts[0] assert isotp.data == dhex("01 02 03 04 05") assert isotp.tx_id == 0x641 assert isotp.rx_id == 0x241 assert isotp.ext_address == 0xc0 assert isotp.rx_ext_address == 0xea = Receive frames from CandumpReader candump_fd = BytesIO(b''' vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 241 [3] 30 00 00 vcan0 541 [5] 21 AA AA AA AA vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 241 [3] 30 00 00 vcan0 541 [5] 21 AA AA AA AA vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 241 [3] 30 00 00 vcan0 541 [5] 21 AA AA AA AA vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 241 [3] 30 00 00 vcan0 541 [5] 21 AA AA AA AA vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 241 [3] 30 00 00 vcan0 541 [5] 21 AA AA AA AA vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 241 [3] 30 00 00 vcan0 541 [5] 21 AA AA AA AA''') with ISOTPSoftSocket(CandumpReader(candump_fd), tx_id=0x241, rx_id=0x541, listen_only=True) as s: pkts = s.sniff(timeout=2, count=6) assert len(pkts) == 6 if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") isotp = pkts[0] print(repr(isotp)) print(hex(isotp.tx_id)) print(hex(isotp.rx_id)) assert isotp.data == dhex("DE AD BE EF AA AA AA AA AA AA") assert isotp.tx_id == 0x241 assert isotp.rx_id == 0x541 = Receive frames from CandumpReader with ISOTPSniffer without extended addressing candump_fd = BytesIO(b''' vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 241 [3] 30 00 00 vcan0 541 [5] 21 AA AA AA AA vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 241 [3] 30 00 00 vcan0 541 [5] 21 AA AA AA AA vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 241 [3] 30 00 00 vcan0 541 [5] 21 AA AA AA AA vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 241 [3] 30 00 00 vcan0 541 [5] 21 AA AA AA AA vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 241 [3] 30 00 00 vcan0 541 [5] 21 AA AA AA AA vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 241 [3] 30 00 00 vcan0 541 [5] 21 AA AA AA AA''') pkts = sniff(opened_socket=CandumpReader(candump_fd), session=ISOTPSession(use_ext_address=False), timeout=1) assert len(pkts) == 6 if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") isotp = pkts[0] assert isotp.data == dhex("DE AD BE EF AA AA AA AA AA AA") assert (isotp.rx_id == 0x541) = Receive frames from CandumpReader with ISOTPSniffer * all flow control frames are detected as single frame with extended address candump_fd = BytesIO(b''' vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 241 [3] 30 00 00 vcan0 541 [5] 21 AA AA AA AA vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 241 [3] 30 00 00 vcan0 541 [5] 21 AA AA AA AA vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 241 [3] 30 00 00 vcan0 541 [5] 21 AA AA AA AA vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 241 [3] 30 00 00 vcan0 541 [5] 21 AA AA AA AA vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 241 [3] 30 00 00 vcan0 541 [5] 21 AA AA AA AA vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 241 [3] 30 00 00 vcan0 541 [5] 21 AA AA AA AA''') pkts = sniff(opened_socket=CandumpReader(candump_fd), session=ISOTPSession, timeout=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") assert len(pkts) == 12 isotp = pkts[1] assert isotp.data == dhex("DE AD BE EF AA AA AA AA AA AA") assert (isotp.rx_id == 0x541) isotp = pkts[0] assert isotp.data == dhex("") assert (isotp.rx_id == 0x241) = Receive frames from CandumpReader with ISOTPSniffer and count * all flow control frames are detected as single frame with extended address candump_fd = BytesIO(b''' vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 241 [3] 30 00 00 vcan0 541 [5] 21 AA AA AA AA vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 241 [3] 30 00 00 vcan0 541 [5] 21 AA AA AA AA vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 241 [3] 30 00 00 vcan0 541 [5] 21 AA AA AA AA vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 241 [3] 30 00 00 vcan0 541 [5] 21 AA AA AA AA vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 241 [3] 30 00 00 vcan0 541 [5] 21 AA AA AA AA vcan0 541 [8] 10 0A DE AD BE EF AA AA vcan0 241 [3] 30 00 00 vcan0 541 [5] 21 AA AA AA AA''') pkts = sniff(opened_socket=CandumpReader(candump_fd), session=ISOTPSession, timeout=1, count=2) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") assert len(pkts) == 2 isotp = pkts[1] assert isotp.data == dhex("DE AD BE EF AA AA AA AA AA AA") assert (isotp.rx_id == 0x541) isotp = pkts[0] assert isotp.data == dhex("") assert (isotp.rx_id == 0x241) = Receive a two-frame ISOTP message with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans: cans.pair(isocan) cans.send(CAN(identifier = 0x241, data = dhex("10 0B 01 02 03 04 05 06"))) cans.send(CAN(identifier = 0x241, data = dhex("21 07 08 09 10 11"))) pkts = s.sniff(count=1, timeout=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") isotp = pkts[0] assert isotp.data == dhex("01 02 03 04 05 06 07 08 09 10 11") = Check what happens when a CAN frame with wrong identifier gets received with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans: cans.pair(isocan) cans.send(CAN(identifier = 0x141, data = dhex("05 01 02 03 04 05"))) assert s.ins.rx_queue.empty() + Testing ISOTPSoftSocket timeouts = Check if not sending the last CF will make the socket timeout with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans: cans.pair(isocan) cans.send(CAN(identifier = 0x241, data = dhex("10 11 01 02 03 04 05 06"))) cans.send(CAN(identifier = 0x241, data = dhex("21 07 08 09 0A 0B 0C 0D"))) isotp = s.sniff(timeout=0.1) assert len(isotp) == 0 = Check if not sending the first CF will make the socket timeout with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans: cans.pair(isocan) cans.send(CAN(identifier = 0x241, data = dhex("10 11 01 02 03 04 05 06"))) isotp = s.sniff(timeout=0.1) assert len(isotp) == 0 = Check if not sending the first FC will make the socket timeout # drain log_stream log_stream.getvalue() isotp = ISOTP(data=dhex("01 02 03 04 05 06 07 08 09 0A")) with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans: cans.pair(isocan) s.send(isotp) time.sleep(1.3) assert "TX state was reset due to timeout" in log_stream.getvalue() = Check if not sending the second FC will make the socket timeout # drain log_stream log_stream.getvalue() isotp = ISOTP(data=b"\xa5" * 120) cans = TestSocket(CAN) isocan = TestSocket(CAN) cans.pair(isocan) acker = AsyncSniffer(store=False, opened_socket=cans, prn=lambda x: cans.send(CAN(identifier = 0x241, data=dhex("30 04 00"))), count=1, timeout=1) acker.start() with ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s: s.send(isotp) time.sleep(1.3) acker.join(timeout=5) cans.close() isocan.close() assert "TX state was reset due to timeout" in log_stream.getvalue() = Check if reception of an overflow FC will make a send fail log_stream.getvalue() isotp = ISOTP(data=b"\xa5" * 120) cans = TestSocket(CAN) isocan = TestSocket(CAN) cans.pair(isocan) acker = AsyncSniffer(store=False, opened_socket=cans, prn=lambda x: cans.send( CAN(identifier = 0x241, data=dhex("32 00 00"))), count=1, timeout=1) acker.start() with ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s: s.send(isotp) time.sleep(1.3) acker.join(timeout=5) cans.close() isocan.close() assert "Overflow happened at the receiver side" in log_stream.getvalue() + More complex operations = ISOTPSoftSocket sr1 msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33') with TestSocket(CAN) as isocan_tx, ISOTPSoftSocket(isocan_tx, 0x123, 0x321) as sock_tx, \ TestSocket(CAN) as isocan_rx, ISOTPSoftSocket(isocan_rx, 0x321, 0x123) as sock_rx: isocan_rx.pair(isocan_tx) sniffer = AsyncSniffer(opened_socket=sock_rx, timeout=1, count=1, prn=lambda x: sock_rx.send(msg)) sniffer.start() rx2 = sock_tx.sr1(msg, timeout=3, verbose=True) sniffer.join(timeout=1) rx = sniffer.results[0] assert rx == msg assert rx2 is not None assert rx2 == msg = ISOTPSoftSocket sniff msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33') with TestSocket(CAN) as isocan1, ISOTPSoftSocket(isocan1, 0x123, 0x321) as sock, \ TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, 0x321, 0x123) as rx_sock: isocan1.pair(isocan) msg.data += b'0' sock.send(msg) msg.data += b'1' sock.send(msg) msg.data += b'2' sock.send(msg) msg.data += b'3' sock.send(msg) msg.data += b'4' sock.send(msg) rx = rx_sock.sniff(count=5, timeout=5) msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33') msg.data += b'0' assert rx[0] == msg msg.data += b'1' assert rx[1] == msg msg.data += b'2' assert rx[2] == msg msg.data += b'3' assert rx[3] == msg msg.data += b'4' assert rx[4] == msg + ISOTPSoftSocket MITM attack tests = bridge and sniff with isotp soft sockets set up vcan0 and vcan1 for package forwarding vcan1 succ = False with TestSocket(CAN) as can0_0, \ TestSocket(CAN) as can0_1, \ TestSocket(CAN) as can1_0, \ TestSocket(CAN) as can1_1, \ ISOTPSoftSocket(can0_0, tx_id=0x241, rx_id=0x641) as isoTpSocket0, \ ISOTPSoftSocket(can1_0, tx_id=0x541, rx_id=0x141) as isoTpSocket1, \ ISOTPSoftSocket(can0_1, tx_id=0x641, rx_id=0x241) as bSocket0, \ ISOTPSoftSocket(can1_1, tx_id=0x141, rx_id=0x141) as bSocket1: can0_0.pair(can0_1) can1_1.pair(can1_0) evt = threading.Event() def forwarding(pkt): global forwarded forwarded += 1 return pkt def bridge(): global forwarded, succ forwarded = 0 bridge_and_sniff(if1=bSocket0, if2=bSocket1, xfrm12=forwarding, xfrm21=forwarding, timeout=1.5, started_callback=evt.set, count=1) succ = True threadBridge = threading.Thread(target=bridge) threadBridge.start() evt.wait(timeout=5) packetsVCan1 = isoTpSocket1.sniff(timeout=1.5, count=1, started_callback=lambda: isoTpSocket0.send(ISOTP(b'Request'))) threadBridge.join(timeout=5) assert not threadBridge.is_alive() assert forwarded == 1 assert len(packetsVCan1) == 1 assert succ = bridge and sniff with isotp soft sockets and multiple long packets N = 3 T = 3 succ = False with TestSocket(CAN) as can0_0, \ TestSocket(CAN) as can0_1, \ TestSocket(CAN) as can1_0, \ TestSocket(CAN) as can1_1, \ ISOTPSoftSocket(can0_0, tx_id=0x241, rx_id=0x641) as isoTpSocket0, \ ISOTPSoftSocket(can1_0, tx_id=0x541, rx_id=0x141) as isoTpSocket1, \ ISOTPSoftSocket(can0_1, tx_id=0x641, rx_id=0x241) as bSocket0, \ ISOTPSoftSocket(can1_1, tx_id=0x141, rx_id=0x541) as bSocket1: can0_0.pair(can0_1) can1_1.pair(can1_0) evt = threading.Event() def forwarding(pkt): global forwarded forwarded += 1 return pkt def bridge(): global forwarded, succ forwarded = 0 bridge_and_sniff(if1=bSocket0, if2=bSocket1, xfrm12=forwarding, xfrm21=forwarding, timeout=T, count=N, started_callback=evt.set) succ = True threadBridge = threading.Thread(target=bridge) threadBridge.start() evt.wait(timeout=5) for _ in range(N): isoTpSocket0.send(ISOTP(b'RequestASDF1234567890')) packetsVCan1 = isoTpSocket1.sniff(timeout=T, count=N) threadBridge.join(timeout=5) assert not threadBridge.is_alive() assert forwarded == N assert len(packetsVCan1) == N assert succ = bridge and sniff with isotp soft sockets set up vcan0 and vcan1 for package change vcan1 succ = False with TestSocket(CAN) as can0_0, \ TestSocket(CAN) as can0_1, \ TestSocket(CAN) as can1_0, \ TestSocket(CAN) as can1_1, \ ISOTPSoftSocket(can0_0, tx_id=0x241, rx_id=0x641) as isoTpSocket0, \ ISOTPSoftSocket(can1_0, tx_id=0x641, rx_id=0x241) as isoTpSocket1, \ ISOTPSoftSocket(can0_1, tx_id=0x641, rx_id=0x241) as bSocket0, \ ISOTPSoftSocket(can1_1, tx_id=0x241, rx_id=0x641) as bSocket1: can0_0.pair(can0_1) can1_1.pair(can1_0) evt = threading.Event() def forwarding(pkt): pkt.data = 'changed' return pkt def bridge(): global succ bridge_and_sniff(if1=bSocket0, if2=bSocket1, xfrm12=forwarding, xfrm21=forwarding, timeout=3, started_callback=evt.set, count=1) succ = True threadBridge = threading.Thread(target=bridge) threadBridge.start() evt.wait(timeout=5) packetsVCan1 = isoTpSocket1.sniff(timeout=2, count=1, started_callback=lambda: isoTpSocket0.send(ISOTP(b'Request'))) threadBridge.join(timeout=5) assert not threadBridge.is_alive() assert len(packetsVCan1) == 1 assert packetsVCan1[0].data == b'changed' assert succ = Two ISOTPSoftSockets at the same time, sending and receiving with TestSocket(CAN) as cs1, ISOTPSoftSocket(cs1, tx_id=0x641, rx_id=0x241) as s1, \ TestSocket(CAN) as cs2, ISOTPSoftSocket(cs2, tx_id=0x241, rx_id=0x641) as s2: cs1.pair(cs2) isotp = ISOTP(data=b"\x10\x25" * 43) s2.send(isotp) result = s1.sniff(count=1, timeout=5) assert len(result) == 1 assert result[0].data == isotp.data = Two ISOTPSoftSockets at the same time, sending and receiving with tx_gap with TestSocket(CAN) as cs1, ISOTPSoftSocket(cs1, tx_id=0x641, rx_id=0x241, stmin=1) as s1, \ TestSocket(CAN) as cs2, ISOTPSoftSocket(cs2, tx_id=0x241, rx_id=0x641) as s2: cs1.pair(cs2) isotp = ISOTP(data=b"\x10\x25" * 43) s2.send(isotp) result = s1.sniff(count=1, timeout=5) assert len(result) == 1 assert result[0].data == isotp.data = Two ISOTPSoftSockets at the same time, multiple sends/receives def test(): with TestSocket(CAN) as cs1, ISOTPSoftSocket(cs1, tx_id=0x641, rx_id=0x241) as s1, \ TestSocket(CAN) as cs2, ISOTPSoftSocket(cs2, tx_id=0x241, rx_id=0x641) as s2: cs1.pair(cs2) for i in range(1, 40, 5): isotp = ISOTP(data=bytearray(range(i, i * 2))) s2.send(isotp) result = s1.sniff(count=8, timeout=5) print(result) for p in result: print(repr(p)) assert len(result) == 8 test() = Send a single frame ISOTP message with padding with TestSocket(CAN) as cs1, ISOTPSoftSocket(cs1, tx_id=0x641, rx_id=0x241, padding=True) as s: with TestSocket(CAN) as cans: cs1.pair(cans) s.send(ISOTP(data=dhex("01"))) pkts = cans.sniff(timeout=1, count=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") res = pkts[0] assert res.length == 8 = Send a single frame ISOTP message with padding FD with TestSocket(CANFD) as cs1, ISOTPSoftSocket(cs1, tx_id=0x641, rx_id=0x241, padding=True, fd=True) as s: with TestSocket(CANFD) as cans: cs1.pair(cans) pl_sizes_testings = [1, 5, 7, 8, 9, 12, 15, 17, 20, 21, 27, 35, 40, 46, 50, 62] pl_sizes_expected = [8, 8, 8, 12, 12, 16, 20, 20, 24, 24, 32, 48, 48, 48, 64, 64] for i, pl_size in enumerate(pl_sizes_testings): s.send(dhex(" ".join(["%02X" % x for x in range(pl_size)]))) pkts = cans.sniff(timeout=1, count=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") res = pkts[0] assert res.length == pl_sizes_expected[i] = Send a two-frame ISOTP message with padding acks = TestSocket(CAN) cans = TestSocket(CAN) acks.pair(cans) def send_ack(x): acks.send(CAN(identifier = 0x241, data=dhex("30 00 00"))) acker = AsyncSniffer(opened_socket=acks, store=False, prn=send_ack, timeout=1, count=1) acker.start() with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, padding=True) as s: acks.pair(isocan) cans.pair(isocan) s.send(ISOTP(data=dhex("01 02 03 04 05 06 07 08"))) canpks = cans.sniff(timeout=1, count=3) acker.join(timeout=5) canpks.sort(key=lambda x:x.identifier) assert canpks[1].identifier == 0x641 assert canpks[1].data == dhex("10 08 01 02 03 04 05 06") assert canpks[0].identifier == 0x241 assert canpks[0].data == dhex("30 00 00") assert canpks[2].identifier == 0x641 assert canpks[2].data == dhex("21 07 08 CC CC CC CC CC") = Receive a padded single frame ISOTP message with padding disabled with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, padding=False) as s: with TestSocket(CAN) as cans: cans.pair(isocan) cans.send(CAN(identifier=0x241, data=dhex("02 05 06 00 00 00 00 00"))) pkts = s.sniff(count=1, timeout=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") res = pkts[0] assert res.data == dhex("05 06") = Receive a padded single frame ISOTP message with padding enabled with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, padding=True) as s: with TestSocket(CAN) as cans: cans.pair(isocan) cans.send(CAN(identifier=0x241, data=dhex("02 05 06 00 00 00 00 00"))) pkts = s.sniff(count=1, timeout=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") res = pkts[0] assert res.data == dhex("05 06") = Receive a non-padded single frame ISOTP message with padding enabled with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, padding=True) as s: with TestSocket(CAN) as cans: cans.pair(isocan) cans.send(CAN(identifier=0x241, data=dhex("02 05 06"))) pkts = s.sniff(count=1, timeout=2) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") res = pkts[0] assert res.data == dhex("05 06") = Receive a padded two-frame ISOTP message with padding enabled with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, padding=True) as s: with TestSocket(CAN) as cans: cans.pair(isocan) cans.send(CAN(identifier=0x241, data=dhex("10 09 01 02 03 04 05 06"))) cans.send(CAN(identifier=0x241, data=dhex("21 07 08 09 00 00 00 00"))) pkts = s.sniff(count=1, timeout=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") res = pkts[0] assert res.data == dhex("01 02 03 04 05 06 07 08 09") = Receive a padded two-frame ISOTP message with padding disabled with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, padding=False) as s: with TestSocket(CAN) as cans: cans.pair(isocan) cans.send(CAN(identifier=0x241, data=dhex("10 09 01 02 03 04 05 06"))) cans.send(CAN(identifier=0x241, data=dhex("21 07 08 09 00 00 00 00"))) pkts = s.sniff(count=1, timeout=1) if not len(pkts): s.failure_analysis() raise Scapy_Exception("ERROR") res = pkts[0] res.show() assert res.data == dhex("01 02 03 04 05 06 07 08 09") + Cleanup = Delete testsockets cleanup_testsockets() TimeoutScheduler.clear() log_runtime.removeHandler(handler)