% Regression tests for the DoIP layer # More information at http://www.secdev.org/projects/UTscapy/ ############ ############ + Doip contrib tests = Load Contrib Layer load_contrib("automotive.doip", globals_dict=globals()) load_contrib("automotive.uds", globals_dict=globals()) = Defaults test p = DoIP(payload_type=1) assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == None assert p.payload_type == 1 = Build test 0 p = DoIP(bytes(DoIP(payload_type=0))) assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == 1 assert p.payload_type == 0 assert p.nack == 0 = Build test 1 p = DoIP(bytes(DoIP(payload_type=1))) assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == 0 assert p.payload_type == 1 = Build test 2 p = DoIP(bytes(DoIP(payload_type=2))) assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == 6 assert p.payload_type == 2 assert bytes(p.eid) == b"\x00" * 6 = Build test 3 p = DoIP(bytes(DoIP(payload_type=3))) assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == 17 assert p.payload_type == 3 assert bytes(p.vin) == b"\x00" * 17 = Build test 4 p = DoIP(bytes(DoIP(payload_type=4))) assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == 33 assert p.payload_type == 4 assert bytes(p.vin) == b"\x00" * 17 assert p.logical_address == 0 assert bytes(p.eid) == b"\x00" * 6 assert bytes(p.gid) == b"\x00" * 6 assert p.further_action == 0 assert p.vin_gid_status == 0 = Build test 5 p = DoIP(bytes(DoIP(payload_type=5))) assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == 7 assert p.payload_type == 5 assert p.source_address == 0 assert p.activation_type == 0 assert p.reserved_iso == 0 assert p.reserved_oem == b"" = Build test 5.1 p = DoIP(bytes(DoIP(payload_type=5, reserved_oem=b"1234"))) assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == 11 assert p.payload_type == 5 assert p.source_address == 0 assert p.activation_type == 0 assert p.reserved_iso == 0 p.show() print(p.reserved_oem) assert p.reserved_oem == b"1234" = Build test 5.2 p = DoIP(bytes(DoIP(payload_type=5, reserved_oem=b"12"))) assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == 9 assert p.payload_type == 5 assert p.source_address == 0 assert p.activation_type == 0 assert p.reserved_iso == 0 assert p.reserved_oem == b"12" = Build test 6 p = DoIP(bytes(DoIP(payload_type=6))) assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == 9 assert p.payload_type == 6 assert p.logical_address_tester == 0 assert p.logical_address_doip_entity == 0 assert p.reserved_iso == 0 assert p.reserved_oem == b"" = Build test 7 p = DoIP(bytes(DoIP(payload_type=7))) assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == 0 assert p.payload_type == 7 = Build test 8 p = DoIP(bytes(DoIP(payload_type=8))) assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == 2 assert p.payload_type == 8 assert p.source_address == 0 = Build test 4001 p = DoIP(bytes(DoIP(payload_type=0x4001))) assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == 0 assert p.payload_type == 0x4001 = Build test 4002 p = DoIP(bytes(DoIP(payload_type=0x4002))) assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == 7 assert p.payload_type == 0x4002 assert p.node_type == 0 assert p.max_open_sockets == 1 assert p.cur_open_sockets == 0 assert p.max_data_size == 0 = Build test 4003 p = DoIP(bytes(DoIP(payload_type=0x4003))) assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == 0 assert p.payload_type == 0x4003 = Build test 4004 p = DoIP(bytes(DoIP(payload_type=0x4004))) assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == 1 assert p.payload_type == 0x4004 assert p.diagnostic_power_mode == 0 = Build test 8001 p = DoIP(bytes(DoIP(payload_type=0x8001))) assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == 4 assert p.payload_type == 0x8001 assert p.source_address == 0 assert p.target_address == 0 = Build test 8002 p = DoIP(bytes(DoIP(payload_type=0x8002))) assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == 5 assert p.payload_type == 0x8002 assert p.source_address == 0 assert p.target_address == 0 assert p.ack_code == 0 assert p.previous_msg == b'' p = DoIP(bytes(DoIP(payload_type=0x8002, previous_msg=b'\x22\xfd\x32'))) assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == 8 assert p.payload_type == 0x8002 assert p.source_address == 0 assert p.target_address == 0 assert p.ack_code == 0 assert p.previous_msg == b'\x22\xfd\x32' p = DoIP(bytes(DoIP(payload_type=0x8002, previous_msg=b'\x19\x02\x09\x9C\x00'))) assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == 10 assert p.payload_type == 0x8002 assert p.source_address == 0 assert p.target_address == 0 assert p.ack_code == 0 assert p.previous_msg == b'\x19\x02\t\x9c\x00' p = DoIP(b'\x02\xfd\x80\x02\x00\x00\x00\x07\x00\x08\x00\x0e\x00\x10\x01') assert p.protocol_version == 0x02 assert p.inverse_version == 0xFD assert p.payload_length == 7 assert p.payload_type == 0x8002 assert p.source_address == 0x8 assert p.target_address == 0xE assert p.ack_code == 0 assert p.previous_msg == b'\x10\x01' = Build test 8003 p = DoIP(bytes(DoIP(payload_type=0x8003))) assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == 5 assert p.payload_type == 0x8003 assert p.source_address == 0 assert p.target_address == 0 assert p.nack_code == 0 p = DoIP(bytes(DoIP(payload_type=0x8003, previous_msg=b'\x2E\xfd\x32\x01\x02'))) assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == 10 assert p.payload_type == 0x8003 assert p.source_address == 0 assert p.target_address == 0 assert p.nack_code == 0 assert p.previous_msg == b'.\xfd2\x01\x02' p = DoIP(bytes(DoIP(payload_type=0x8003, previous_msg=b'\x19\x02\x09\x9A\x00'))) assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == 10 assert p.payload_type == 0x8003 assert p.source_address == 0 assert p.target_address == 0 assert p.nack_code == 0 assert p.previous_msg == b'\x19\x02\t\x9a\x00' p = DoIP(b'\x02\xfd\x80\x03\x00\x00\x00\x07\x00\x0A\x00\x0C\x00\x10\x03') assert p.protocol_version == 0x02 assert p.inverse_version == 0xFD assert p.payload_length == 7 assert p.payload_type == 0x8003 assert p.source_address == 0xA assert p.target_address == 0xC assert p.nack_code == 0 assert p.previous_msg == b'\x10\x03' + pcap based tests = read diag_ack pcap file pkt = rdpcap(scapy_path("test/pcaps/doip_ack.pcap")).res[0] assert len(pkt) == 70 = dissect test of diag ACK with previous_msg field filled assert pkt.protocol_version == 0x02 assert pkt.inverse_version == 0xFD assert pkt.payload_length == 8 assert pkt.source_address == 0x4B assert pkt.target_address == 0xE00 assert pkt.ack_code == 0 assert pkt.previous_msg == b'\x22\xFD\x31' = read main pcap file pkts = rdpcap(scapy_path("test/pcaps/doip.pcap.gz")) ips = [p for p in pkts if p.proto == 6] assert len(ips) > 1 = dissect test of routing activation pkts req req = ips[0] p = req assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == 11 assert p.payload_type == 0x5 assert p.source_address == 0xe80 assert p.activation_type == 0 assert p.reserved_iso == 0 assert p.reserved_oem == b"\x00\x00\x00\x00" = dissect test of routing activation pkts resp resp = ips[1] p = resp assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == 9 assert p.payload_type == 0x6 assert p.logical_address_tester == 0xe80 assert p.logical_address_doip_entity == 0x4010 assert p.routing_activation_response == 16 assert p.reserved_iso == 0 = answers test of routing activation pkts assert resp.answers(req) assert resp.hashret() == req.hashret() = dissect diagnostic message req = ips[-4] resp = ips[-1] p = req assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == 6 assert p.payload_type == 0x8001 assert p.source_address == 0xe80 assert p.target_address == 0x4010 assert bytes(p)[-2:] == bytes(UDS()/UDS_DSC(b"\x02")) assert p.service == 0x10 assert p.diagnosticSessionType == 2 p = resp assert p.protocol_version == 0x02 assert p.inverse_version == 0xfd assert p.payload_length == 10 assert p.payload_type == 0x8001 assert p.target_address == 0xe80 assert p.source_address == 0x4010 assert bytes(p)[-6:] == bytes(UDS()/UDS_DSCPR(b"\x02\x002\x01\xf4")) assert p.service == 0x50 assert p.diagnosticSessionType == 2 assert req.hashret() == resp.hashret() # exclude TCP layer from answers check assert resp[3].answers(req[3]) assert not req[3].answers(resp[3]) = TCPSession Test tmp_file = get_temp_file() wrpcap(tmp_file, [ IP(src="10.10.10.10", dst="10.10.10.11") / TCP(sport=61000, seq=1) / DoIP(payload_type=0x8001, payload_length=6) / b"\x3E", IP(src="10.10.10.10", dst="10.10.10.11") / TCP(sport=61000, dport=13400, seq=14) / Raw(load=b"\xff") ]) pkts = sniff(offline=tmp_file, session=TCPSession) assert pkts[0].haslayer(UDS_TP) assert pkts[0].service == 0x3E = TCPSession Test multiple DoIP messages filename = scapy_path("/test/pcaps/multiple_doip_layers.pcap.gz") pkts = sniff(offline=filename, session=TCPSession) print(repr(pkts[0])) print(repr(pkts[1])) assert len(pkts) == 2 assert pkts[0][DoIP].payload_length == 2 assert pkts[0][DoIP:2].payload_length == 7 assert pkts[1][DoIP].payload_length == 103 + DoIP Communication tests = Load libraries import base64 import ssl import tempfile = Test DoIPSocket server_up = threading.Event() sniff_up = threading.Event() def server(): buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(('127.0.0.1', 13400)) sock.listen(1) server_up.set() connection, address = sock.accept() sniff_up.wait(timeout=1) connection.send(buffer) connection.close() finally: sock.close() server_thread = threading.Thread(target=server) server_thread.start() server_up.wait(timeout=1) sock = DoIPSocket(activate_routing=False) pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) server_thread.join(timeout=1) assert len(pkts) == 2 = Test DoIPSocket 2 ~ linux server_up = threading.Event() sniff_up = threading.Event() def server(): buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(('127.0.0.1', 13400)) sock.listen(1) server_up.set() connection, address = sock.accept() sniff_up.wait(timeout=1) for i in range(len(buffer)): connection.send(buffer[i:i+1]) time.sleep(0.01) connection.close() finally: sock.close() server_thread = threading.Thread(target=server) server_thread.start() server_up.wait(timeout=1) sock = DoIPSocket(activate_routing=False) pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) server_thread.join(timeout=1) assert len(pkts) == 2 = Test DoIPSocket 3 server_up = threading.Event() sniff_up = threading.Event() def server(): buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(('127.0.0.1', 13400)) sock.listen(1) server_up.set() connection, address = sock.accept() sniff_up.wait(timeout=1) while buffer: randlen = random.randint(0, len(buffer)) connection.send(buffer[:randlen]) buffer = buffer[randlen:] time.sleep(0.01) connection.close() finally: sock.close() server_thread = threading.Thread(target=server) server_thread.start() server_up.wait(timeout=1) sock = DoIPSocket(activate_routing=False) pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) server_thread.join(timeout=1) assert len(pkts) == 2 = Test DoIPSocket6 server_up = threading.Event() sniff_up = threading.Event() def server(): buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) try: sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(('::1', 13400)) sock.listen(1) server_up.set() connection, address = sock.accept() sniff_up.wait(timeout=1) connection.send(buffer) connection.close() finally: sock.close() server_thread = threading.Thread(target=server) server_thread.start() server_up.wait(timeout=1) sock = DoIPSocket(ip="::1", activate_routing=False) pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) server_thread.join(timeout=1) assert len(pkts) == 2 = Test DoIPSslSocket ~ broken_windows certstring = """ LS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tCk1JSUV2QUlCQURBTkJna3Foa2lHOXcwQkFRRUZB QVNDQktZd2dnU2lBZ0VBQW9JQkFRRFUvK0hRbVpzSDl2QVcKQ3ZMQjRxalpnZFJSSXE1b2JBanB4 YUhoUGxCVEMvUlBzMHIxRVF0V0FtbXNEZFE3UGlLaCtYa1hES3pNY3lJSQp1a0ZpNThUQW1idGFj N0U5VmJHSnNlTWp2RkJKSkFqQXVtbFdRZk5XcSs2TkZhdmRkTDQrSTNBTVJ5TldJTkJYCjhHMzRo dldIbDdTOGhhSFFZN0FXcUZWVTNVL2xKR2pubnF3MEJraEIvVGRCTWIwM0habzkrVjIrWU9RZmk5 QWsKTVRSRXpSeWVObWJqT0sxbHpXdFJXWkZZU0RnMEtqUVh4SkdFNVc5MzFPWitHL1NkbytTM1ZW SVRPdWxQbHRmVwpXMEdjeCsvZERSNFIxNG5mcUl5L1daMElHUVNXMlRsQytmeGJ0dURDUkFqelRz b0J3YjJ0cnpoR0VtYVFveUtNCnpBKzVSUHNyQWdNQkFBRUNnZ0VBRUJHaEoyWm5OVHh5YVY5TnZY QjI1NDNZQnRUMGVSUHBhanJLMXg0bk1OU3oKNE9LNFVzWlo1MnBnTHRHT1EzZm1aS0l0cEo1WlY1 cVBUejdwN3VjUzhnQWNZUnNJUnpCMHA5d3FpWExMK3h0RApxUjB4dnR4VDJpUGlFblVNNndudHpr SHpKK0g0QkZLT2FvdjNaK3Fha2E1UmFCcmhheGRuaDBDNklLQmZtM3cyCm5zUWI2N0lCYWwrSnBs L1g5TENWRkdRT2owb0lmVWI5ZFp3OWQ3MCthSGVVb2xvMGdYZmxxcXFFcnl3ZDlPN2QKNnp4dGlx cnRyZUJhK1IraWs3NE1SK0xvaFNVR3o2VTRQaXhWQ3l1SnQ2U0hvRHR2L3dtSnltWDd2a0FRS2w1 RQplK1JqUGVyakpUWTNzNXNXbEd2V21UTEtEbnVyS2pBYzZUOHhKb0pXWlFLQmdRRHdsd2RRdmww S28wNHhDUmtiCklYRGVJZE1jZkp2ejRGZEtka1BmVnZVT2xHVEpNZkRzbWNoUzZhcEJCQUdQMUU2 VkN2VzJmUFdjaXhScHE3MW8KR2xtbWZ5RnlJRW0rL08yamMvSFRXWHp6Qjdoc0JISEltQklHczFU TC9iWFU3amhVQW5kWDdMK3RSRDBKNWRGVwpiN1VOOXNxaWdtRG42REJWZkxaUHgxRnlWUUtCZ1FE aXBIT1BhNmVMSlk5R1FZdkw3OTIyTHNoU3ZYSUFVMERGCjBabTlqbjM2b3ZIY0kvWEZDdHVXank2 WG9wbk9pbjlycmtUY2FDUnBvSEFNb00ycHdiR0tFY0dVVEY2RHQ3akYKRHVnd2srR21sbDkrbjM2 M3Iwb09YNktSbWFhRStiZHoyNjNQVEhMaktYUnFyc3h5WEtMT3ZyTXhVNWNzMXJCeQpTMWI2ZGhr M2Z3S0JnRjlONUliMnNkS3ArQ3B5aVRCM0ljZk1yRjBuZTN1ekRjRWdjaWlCd05lQ3J4NElHNEVP Ck5nMnFKRmhXNXV0NzFaa3kyenpyNlR1VzJJSTNsdk1ySlFKUWNBWk9oZ2dURjJ2ZFhSazA1TXM4 N3JCVFhtTncKNGdzbmROck42UDZ0VTBEc0xTeDJTME91dVdNM1Y2S2U0NkRoZDBuQ3pmSnZ4dDNH WmszYURnaDFBb0dBWFhIcQpoNDZlZEx1V3VDUGNUTWhvUkc1RGdBSEdHQ1k3UlpTbTY4WHRZVUov c0FGUG10OWdMRko2cG1DUFE5NU1yUXdjCkxqZnVFM0xuMy8wSTd0NENvbWV4eGNBN0U5blRIOFNH clVpN3QrQzJITklNQUJZUTFaNU91L042K2Nhd0FkL28KYU5rZllWTzlRU015L2svOWZIcWFEVk5t dUVFSVhRZDlKQ1UvUG1jQ2dZQWI0RTBRWTdDZmlrV293OFIzSlhoZgo0MHFVVkdud09QKzJNbXE5 d2ZmWkpTRHNFSTQvb2g0VGRnN0sybHNNazVsWnRaMyszTjljSDVUc1pMYlJtd2FMCm9sRVl6K1BB WU91MlMrY1l2bFlNL0V2WmlpRHJybjZuTStNbTNnaXJPYkNwMzcxd1ZxRFVsUnB4OUlwWVdYcnAK T3YxUXFHdXkwODdyQkk1cStWL3hqQT09Ci0tLS0tRU5EIFBSSVZBVEUgS0VZLS0tLS0KLS0tLS1C RUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUQ3VENDQXRXZ0F3SUJBZ0lVVTNsendsTVNSa294Tkdk SFJzZllIcUtxcDAwd0RRWUpLb1pJaHZjTkFRRUwKQlFBd2dZVXhDekFKQmdOVkJBWVRBa1JGTVJN d0VRWURWUVFJREFwVGIyMWxMVk4wWVhSbE1Rd3dDZ1lEVlFRSApEQU5TUlVjeEVUQVBCZ05WQkFv TUNHUnBjM05sWTNSdk1Rd3dDZ1lEVlFRTERBTkVSVll4RFRBTEJnTlZCQU1NCkJGUkZVMVF4SXpB aEJna3Foa2lHOXcwQkNRRVdGR052Ym5SaFkzUXRkWE5BWkdsemMyVmpMblJ2TUI0WERUSTAKTURN eE9ERTVNek13TlZvWERUSTBNRFF4TnpFNU16TXdOVm93Z1lVeEN6QUpCZ05WQkFZVEFrUkZNUk13 RVFZRApWUVFJREFwVGIyMWxMVk4wWVhSbE1Rd3dDZ1lEVlFRSERBTlNSVWN4RVRBUEJnTlZCQW9N Q0dScGMzTmxZM1J2Ck1Rd3dDZ1lEVlFRTERBTkVSVll4RFRBTEJnTlZCQU1NQkZSRlUxUXhJekFo QmdrcWhraUc5dzBCQ1FFV0ZHTnYKYm5SaFkzUXRkWE5BWkdsemMyVmpMblJ2TUlJQklqQU5CZ2tx aGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQwpBUUVBMVAvaDBKbWJCL2J3RmdyeXdlS28yWUhV VVNLdWFHd0k2Y1doNFQ1UVV3djBUN05LOVJFTFZnSnByQTNVCk96NGlvZmw1Rnd5c3pITWlDTHBC WXVmRXdKbTdXbk94UFZXeGliSGpJN3hRU1NRSXdMcHBWa0h6VnF2dWpSV3IKM1hTK1BpTndERWNq VmlEUVYvQnQrSWIxaDVlMHZJV2gwR093RnFoVlZOMVA1U1JvNTU2c05BWklRZjAzUVRHOQpOeDJh UGZsZHZtRGtINHZRSkRFMFJNMGNualptNHppdFpjMXJVVm1SV0VnNE5DbzBGOFNSaE9WdmQ5VG1m aHYwCm5hUGt0MVZTRXpycFQ1YlgxbHRCbk1mdjNRMGVFZGVKMzZpTXYxbWRDQmtFbHRrNVF2bjhX N2Jnd2tRSTgwN0sKQWNHOXJhODRSaEpta0tNaWpNd1B1VVQ3S3dJREFRQUJvMU13VVRBZEJnTlZI UTRFRmdRVVZhbUFkUjR1ZW8zQgpmV0RjUlMyUkQ3OEtlZXd3SHdZRFZSMGpCQmd3Rm9BVVZhbUFk UjR1ZW8zQmZXRGNSUzJSRDc4S2Vld3dEd1lEClZSMFRBUUgvQkFVd0F3RUIvekFOQmdrcWhraUc5 dzBCQVFzRkFBT0NBUUVBRjE1TTNvL3RyUVdYeHdHamlxZjgKNXBUTEM0bHJwQkZaTFZDbStQdHd4 aENlN1ZSd2dLMElBb01EMW0vSjNEYnVJSjVURXlTVElnR2N0WHVNbG5pWgpsY3IwekZOZVVhQ08w YkdhaExYUXpCWTRxSkhTTUNWNnhiNXNqUDlEdk9HYnFxbHVTbk51ZFJ5UWNIbkd4SE0rCk1adXpO WUNseklOMEtYbFJuSTZqRXUrcG9XZ0pEMGN1NFM2b1lwT2R3bElRYmtaNnIrUE1jQ3hpRmhRd3E2 em4KcE1nQzB0WlpSM3pCOEpVcTJwRHlGVy9jVlFjWkp5YUhnQkkwWlJWWG5wbDFqYng2YlNIOCts cnMxVk1xZDlkcQozd1BMcjBheWI2VkpNa29WMjNWSXAzLzlYQVpTR3Z6Y0dadnM2VThSUTdFbUtx akJibWxudm1CTkpUMk9xbFFRCllRPT0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=""" certstring = certstring.replace('\n', '') def _load_certificate_chain(context) -> None: with tempfile.NamedTemporaryFile(delete=False) as fp: fp.write(base64.b64decode(certstring)) fp.close() context.load_cert_chain(fp.name) server_up = threading.Event() sniff_up = threading.Event() def server(): context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) _load_certificate_chain(context) context.check_hostname = False context.verify_mode = ssl.CERT_NONE buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) ssock = context.wrap_socket(sock) try: ssock.bind(('127.0.0.1', 3496)) ssock.listen(1) server_up.set() connection, address = ssock.accept() sniff_up.wait(timeout=1) connection.send(buffer) connection.close() finally: ssock.close() server_thread = threading.Thread(target=server) server_thread.start() server_up.wait(timeout=1) context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.check_hostname = False context.verify_mode = ssl.CERT_NONE sock = DoIPSocket(activate_routing=False, force_tls=True, context=context) pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) server_thread.join(timeout=1) assert len(pkts) == 2 = Test DoIPSslSocket6 ~ broken_windows server_up = threading.Event() sniff_up = threading.Event() def server(): context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) _load_certificate_chain(context) context.check_hostname = False context.verify_mode = ssl.CERT_NONE buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) ssock = context.wrap_socket(sock) try: ssock.bind(('::1', 3496)) ssock.listen(1) server_up.set() connection, address = ssock.accept() sniff_up.wait(timeout=1) connection.send(buffer) connection.close() finally: ssock.close() server_thread = threading.Thread(target=server) server_thread.start() server_up.wait(timeout=1) context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.check_hostname = False context.verify_mode = ssl.CERT_NONE sock = DoIPSocket(ip="::1", activate_routing=False, force_tls=True, context=context) pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) server_thread.join(timeout=1) assert len(pkts) == 2 = Test UDS_DoIPSslSocket6 ~ broken_windows server_up = threading.Event() sniff_up = threading.Event() def server(): context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) _load_certificate_chain(context) context.check_hostname = False context.verify_mode = ssl.CERT_NONE buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) ssock = context.wrap_socket(sock) try: ssock.bind(('::1', 3496)) ssock.listen(1) server_up.set() connection, address = ssock.accept() sniff_up.wait(timeout=1) connection.send(buffer) connection.close() finally: ssock.close() server_thread = threading.Thread(target=server) server_thread.start() server_up.wait(timeout=1) context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.check_hostname = False context.verify_mode = ssl.CERT_NONE sock = UDS_DoIPSocket(ip="::1", activate_routing=False, force_tls=True, context=context) pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) server_thread.join(timeout=1) assert len(pkts) == 2 = Test UDS_DualDoIPSslSocket6 ~ broken_windows not_pypy server_tcp_up = threading.Event() server_tls_up = threading.Event() sniff_up = threading.Event() def server_tls(): context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) _load_certificate_chain(context) context.check_hostname = False context.verify_mode = ssl.CERT_NONE buffer = bytes.fromhex("02fd0006000000090e8011061000000000") buffer += b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) ssock = context.wrap_socket(sock) try: ssock.bind(('::1', 3496)) ssock.listen(1) server_tls_up.set() connection, address = ssock.accept() sniff_up.wait(timeout=1) connection.send(buffer) connection.close() finally: ssock.close() def server_tcp(): buffer = bytes.fromhex("02fd0006000000090e8011060700000000") sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) try: sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(('::1', 13400)) sock.listen(1) server_tcp_up.set() connection, address = sock.accept() connection.send(buffer) connection.shutdown(socket.SHUT_RDWR) connection.close() finally: sock.close() server_tcp_thread = threading.Thread(target=server_tcp) server_tcp_thread.start() server_tcp_up.wait(timeout=1) server_tls_thread = threading.Thread(target=server_tls) server_tls_thread.start() server_tls_up.wait(timeout=1) context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.check_hostname = False context.verify_mode = ssl.CERT_NONE sock = UDS_DoIPSocket(ip="::1", context=context) pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) server_tcp_thread.join(timeout=1) server_tls_thread.join(timeout=1) assert len(pkts) == 2