% Regression tests for isotp_scan ~ scanner + Configuration ~ conf = Imports from scapy.contrib.isotp.isotp_scanner import send_multiple_ext, filter_periodic_packets, scan_extended, scan from test.testsocket import TestSocket with open(scapy_path("test/contrib/automotive/interface_mockup.py")) as f: exec(f.read()) = Test send_multiple_ext() pkt = ISOTPHeaderEA(identifier=0x100, extended_address=1)/ISOTP_FF(message_size=100, data=b'\x00\x00\x00\x00\x00') number_of_packets = 100 with new_can_socket0() as sock1, new_can_socket0() as sock: send_multiple_ext(sock1, 0, pkt, number_of_packets) pkts = sock.sniff(timeout=4, count=number_of_packets) assert len(pkts) == number_of_packets = Test filter_periodic_packets() with periodic packets pkt = CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08') received_packets = dict() for i in range(40): temp_pkt = pkt.copy() temp_pkt.time = i / 1000 received_packets[i] = (temp_pkt, temp_pkt.identifier) filter_periodic_packets(received_packets) assert len(received_packets) == 0 = Test filter_periodic_packets() with periodic packets and one outlier outlier = CAN(identifier=300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08') outlier.time = 50 / 1000 pkt = CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08') received_packets = dict() for i in range(40): temp_pkt = pkt.copy() temp_pkt.time = i / 1000 received_packets[i] = (temp_pkt, temp_pkt.identifier) received_packets[40] = (outlier, outlier.identifier) filter_periodic_packets(received_packets) assert len(received_packets) == 1 = Test filter_periodic_packets() with nonperiodic packets pkt = CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08') received_packets = dict() for i in range(40): temp_pkt = pkt.copy() temp_pkt.time = (i * i) / 1000 received_packets[i] = (temp_pkt, temp_pkt.identifier) filter_periodic_packets(received_packets) assert len(received_packets) == 40 = define helper function def make_noise(p, t): for _ in range(20): sock_noise.send(p) time.sleep(t) = test scan sock_sender = TestSocket(CAN) sockets = list() for idx in range(1, 4): sock_recv = TestSocket(CAN) sock_sender.pair(sock_recv) sockets.append(ISOTPSoftSocket(sock_recv, tx_id=0x700 + idx, rx_id=0x600 + idx)) found_packets = scan(sock_sender, range(0x5ff, 0x604), noise_ids=[0x701], sniff_time=0.1) for s in sockets: s.close() assert len(found_packets) == 2 assert found_packets[0x602][0].identifier == 0x702 assert found_packets[0x603][0].identifier == 0x703 = test scan extended sock_sender = TestSocket(CAN) sock_recv = TestSocket(CAN) sock_sender.pair(sock_recv) with ISOTPSoftSocket(sock_recv, tx_id=0x700, rx_id=0x601, ext_address=0xaa, rx_ext_address=0xbb): found_packets = scan_extended(sock_sender, [0x600, 0x601], extended_scan_range=range(0xb0, 0xc0), sniff_time=0.1) fpkt = found_packets[list(found_packets.keys())[0]][0] rpkt = CAN(flags=0, identifier=0x700, length=4, data=b'\xaa0\x00\x00') assert fpkt.length == rpkt.length assert fpkt.data == rpkt.data assert fpkt.identifier == rpkt.identifier = scan with text output sock_sender = TestSocket(CAN) sock_recv1 = TestSocket(CAN) sock_sender.pair(sock_recv1) sock_recv2 = TestSocket(CAN) sock_sender.pair(sock_recv2) sock_noise = TestSocket(CAN) sock_sender.pair(sock_noise) with ISOTPSoftSocket(sock_recv1, tx_id=0x702, rx_id=0x602), ISOTPSoftSocket(sock_recv2, tx_id=0x703, rx_id=0x603): pkt = CAN(identifier=0x701, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08') make_noise(pkt, 0.01) result = isotp_scan(sock_sender, range(0x5ff, 0x604 + 1), output_format="text", noise_listen_time=0.1, sniff_time=0.02, verbose=False) text = "\nFound 2 ISOTP-FlowControl Packet(s):" assert text in result assert "0x602" in result assert "0x603" in result assert "0x702" in result assert "0x703" in result assert "No Padding" in result = scan with text output padding sock_sender = TestSocket(CAN) sock_recv1 = TestSocket(CAN) sock_sender.pair(sock_recv1) sock_recv2 = TestSocket(CAN) sock_sender.pair(sock_recv2) sock_noise = TestSocket(CAN) sock_sender.pair(sock_noise) with ISOTPSoftSocket(sock_recv1, tx_id=0x702, rx_id=0x602, padding=True), ISOTPSoftSocket(sock_recv2, tx_id=0x703, rx_id=0x603, padding=True): pkt = CAN(identifier=0x701, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08') make_noise(pkt, 0.01) result = isotp_scan(sock_sender, range(0x5ff, 0x604 + 1), output_format="text", noise_listen_time=0.1, sniff_time=0.02, verbose=False) text = "\nFound 2 ISOTP-FlowControl Packet(s):" assert text in result assert "0x602" in result assert "0x603" in result assert "0x702" in result assert "0x703" in result assert "Padding enabled" in result = scan with text output extended_can id sock_sender = TestSocket(CAN) sock_recv1 = TestSocket(CAN) sock_sender.pair(sock_recv1) sock_recv2 = TestSocket(CAN) sock_sender.pair(sock_recv2) sock_noise = TestSocket(CAN) sock_sender.pair(sock_noise) with ISOTPSoftSocket(sock_recv1, tx_id=0x1ffff702, rx_id=0x1ffff602), ISOTPSoftSocket(sock_recv2, tx_id=0x1ffff703, rx_id=0x1ffff603): pkt = CAN(identifier=0x1ffff701, flags="extended", length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08') make_noise(pkt, 0.01) result = isotp_scan(sock_sender, range(0x1ffff5ff, 0x1ffff604 + 1), output_format="text", noise_listen_time=0.1, sniff_time=0.02, extended_can_id=True, verbose=False) text = "\nFound 2 ISOTP-FlowControl Packet(s):" assert text in result assert "0x1ffff602" in result assert "0x1ffff603" in result assert "0x1ffff702" in result assert "0x1ffff703" in result assert "No Padding" in result = scan with code output sock_sender = TestSocket(CAN) sock_recv1 = TestSocket(CAN) sock_sender.pair(sock_recv1) sock_recv2 = TestSocket(CAN) sock_sender.pair(sock_recv2) sock_noise = TestSocket(CAN) sock_sender.pair(sock_noise) with ISOTPSoftSocket(sock_recv1, tx_id=0x702, rx_id=0x602), ISOTPSoftSocket(sock_recv2, tx_id=0x703, rx_id=0x603): pkt = CAN(identifier=0x701, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08') make_noise(pkt, 0.01) result = isotp_scan(sock_sender, range(0x5ff, 0x604 + 1), output_format="code", noise_listen_time=0.1, sniff_time=0.02, can_interface="can0", verbose=False) s1 = "ISOTPSocket(can0, tx_id=0x602, rx_id=0x702, " \ "padding=False, basecls=ISOTP)\n" s2 = "ISOTPSocket(can0, tx_id=0x603, rx_id=0x703, " \ "padding=False, basecls=ISOTP)\n" assert s1 in result assert s2 in result = scan with json output sock_sender = TestSocket(CAN) sock_recv1 = TestSocket(CAN) sock_sender.pair(sock_recv1) sock_recv2 = TestSocket(CAN) sock_sender.pair(sock_recv2) sock_noise = TestSocket(CAN) sock_sender.pair(sock_noise) with ISOTPSoftSocket(sock_recv1, tx_id=0x702, rx_id=0x602), ISOTPSoftSocket(sock_recv2, tx_id=0x703, rx_id=0x603): pkt = CAN(identifier=0x701, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08') make_noise(pkt, 0.01) result = isotp_scan(sock_sender, range(0x5ff, 0x604 + 1), output_format="json", noise_listen_time=0.1, sniff_time=0.02, can_interface="can0", verbose=False) s1 = "\"iface\": \"can0\", \"tx_id\": 1538, \"rx_id\": 1794, " \ "\"padding\": false, \"basecls\": \"ISOTP\"" s2 = "\"iface\": \"can0\", \"tx_id\": 1539, \"rx_id\": 1795, " \ "\"padding\": false, \"basecls\": \"ISOTP\"" print(result) assert s1 in result assert s2 in result = scan with code output noise sock_sender = TestSocket(CAN) sock_recv1 = TestSocket(CAN) sock_sender.pair(sock_recv1) sock_recv2 = TestSocket(CAN) sock_sender.pair(sock_recv2) sock_noise = TestSocket(CAN) sock_sender.pair(sock_noise) with ISOTPSoftSocket(sock_recv1, tx_id=0x702, rx_id=0x602), ISOTPSoftSocket(sock_recv2, tx_id=0x703, rx_id=0x603): pkt = CAN(identifier=0x702, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08') make_noise(pkt, 0.01) result = isotp_scan(sock_sender, range(0x5ff, 0x604 + 1), output_format="code", noise_listen_time=0.1, sniff_time=0.02, can_interface="can0", verbose=False) s1 = "ISOTPSocket(can0, tx_id=0x602, rx_id=0x702, " \ "padding=False, basecls=ISOTP)\n" s2 = "ISOTPSocket(can0, tx_id=0x603, rx_id=0x703, " \ "padding=False, basecls=ISOTP)\n" assert s1 not in result assert s2 in result = scan with code output extended_isotp sock_sender = TestSocket(CAN) sock_recv1 = TestSocket(CAN) sock_sender.pair(sock_recv1) sock_recv2 = TestSocket(CAN) sock_sender.pair(sock_recv2) sock_noise = TestSocket(CAN) sock_sender.pair(sock_noise) with ISOTPSoftSocket(sock_recv1, tx_id=0x702, rx_id=0x602, ext_address=0x11, rx_ext_address=0x22), ISOTPSoftSocket(sock_recv2, tx_id=0x703, rx_id=0x603, ext_address=0x11, rx_ext_address=0x22): pkt = CAN(identifier=0x701, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08') make_noise(pkt, 0.01) result = isotp_scan(sock_sender, range(0x5ff, 0x604 + 1), output_format="code", noise_listen_time=0.1, sniff_time=0.05, extended_scan_range=range(0x20, 0x30), extended_addressing=True, can_interface="can0", verbose=False) s1 = "ISOTPSocket(can0, tx_id=0x602, rx_id=0x702, padding=False, " \ "ext_address=0x22, rx_ext_address=0x11, basecls=ISOTP)" s2 = "ISOTPSocket(can0, tx_id=0x603, rx_id=0x703, padding=False, " \ "ext_address=0x22, rx_ext_address=0x11, basecls=ISOTP)" assert s1 in result assert s2 in result = scan with code output extended_isotp extended can id sock_sender = TestSocket(CAN) sock_recv1 = TestSocket(CAN) sock_sender.pair(sock_recv1) sock_recv2 = TestSocket(CAN) sock_sender.pair(sock_recv2) sock_noise = TestSocket(CAN) sock_sender.pair(sock_noise) with ISOTPSoftSocket(sock_recv1, tx_id=0x1ffff702, rx_id=0x1ffff602, ext_address=0x11, rx_ext_address=0x22), ISOTPSoftSocket(sock_recv2, tx_id=0x1ffff703, rx_id=0x1ffff603, ext_address=0x11, rx_ext_address=0x22): pkt = CAN(identifier=0x701, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08') make_noise(pkt, 0.01) result = isotp_scan(sock_sender, range(0x1ffff5ff, 0x1ffff604 + 1), output_format="code", noise_listen_time=0.1, sniff_time=0.05, extended_scan_range=range(0x20, 0x30), extended_addressing=True, can_interface="can0", verbose=False) s1 = "ISOTPSocket(can0, tx_id=0x1ffff602, rx_id=0x1ffff702, padding=False, " \ "ext_address=0x22, rx_ext_address=0x11, basecls=ISOTP)" s2 = "ISOTPSocket(can0, tx_id=0x1ffff603, rx_id=0x1ffff703, padding=False, " \ "ext_address=0x22, rx_ext_address=0x11, basecls=ISOTP)" print(result) assert s1 in result assert s2 in result = scan default output sock_sender = TestSocket(CAN) sock_recv1 = TestSocket(CAN) sock_sender.pair(sock_recv1) sock_recv2 = TestSocket(CAN) sock_sender.pair(sock_recv2) sock_noise = TestSocket(CAN) sock_sender.pair(sock_noise) with ISOTPSoftSocket(sock_recv1, tx_id=0x702, rx_id=0x602), ISOTPSoftSocket(sock_recv2, tx_id=0x703, rx_id=0x603): pkt = CAN(identifier=0x701, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08') make_noise(pkt, 0.01) result = isotp_scan(sock_sender, range(0x5ff, 0x604 + 1), noise_listen_time=0.1, sniff_time=0.02, can_interface=new_can_socket0(), verbose=False) assert 0x602 == result[0].tx_id assert 0x702 == result[0].rx_id assert 0x603 == result[1].tx_id assert 0x703 == result[1].rx_id for s in result: s.close() del s = scan default output extended sock_sender = TestSocket(CAN) sock_recv1 = TestSocket(CAN) sock_sender.pair(sock_recv1) sock_recv2 = TestSocket(CAN) sock_sender.pair(sock_recv2) sock_noise = TestSocket(CAN) sock_sender.pair(sock_noise) with ISOTPSoftSocket(sock_recv1, tx_id=0x702, rx_id=0x602, ext_address=0x11, rx_ext_address=0x22), ISOTPSoftSocket(sock_recv2, tx_id=0x703, rx_id=0x603, ext_address=0x11, rx_ext_address=0x22): pkt = CAN(identifier=0x701, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08') make_noise(pkt, 0.01) result = isotp_scan(sock_sender, range(0x5ff, 0x604 + 1), noise_listen_time=0.1, sniff_time=0.02, extended_scan_range=range(0x20, 0x30), extended_addressing=True, can_interface=new_can_socket0(), verbose=False) assert 0x602 == result[0].tx_id assert 0x702 == result[0].rx_id assert 0x22 == result[0].ext_address assert 0x11 == result[0].rx_ext_address assert 0x603 == result[1].tx_id assert 0x703 == result[1].rx_id assert 0x22 == result[1].ext_address assert 0x11 == result[1].rx_ext_address for s in result: s.close() del s + Cleanup = Delete vcan interfaces assert cleanup_interfaces() + Coverage stability tests = empty tests from scapy.contrib.isotp.isotp_scanner import generate_code_output, generate_text_output assert generate_code_output("", None) == "" assert generate_text_output("") == "No packets found." = get_isotp_fc from scapy.contrib.isotp.isotp_scanner import get_isotp_fc # to trigger "noise_ids.append(packet.identifier)" a = [] get_isotp_fc( 1, [], a, False, Bunch( flags="extended", identifier=1, data=b"\x00" ) ) assert 1 in a