1% Regression tests for the DoIP layer 2 3# More information at http://www.secdev.org/projects/UTscapy/ 4 5 6############ 7############ 8 9+ Doip contrib tests 10 11= Load Contrib Layer 12 13load_contrib("automotive.doip", globals_dict=globals()) 14load_contrib("automotive.uds", globals_dict=globals()) 15 16= Defaults test 17 18p = DoIP(payload_type=1) 19 20assert p.protocol_version == 0x02 21assert p.inverse_version == 0xfd 22assert p.payload_length == None 23assert p.payload_type == 1 24 25= Build test 0 26 27p = DoIP(bytes(DoIP(payload_type=0))) 28 29assert p.protocol_version == 0x02 30assert p.inverse_version == 0xfd 31assert p.payload_length == 1 32assert p.payload_type == 0 33assert p.nack == 0 34 35= Build test 1 36 37p = DoIP(bytes(DoIP(payload_type=1))) 38 39assert p.protocol_version == 0x02 40assert p.inverse_version == 0xfd 41assert p.payload_length == 0 42assert p.payload_type == 1 43 44= Build test 2 45 46p = DoIP(bytes(DoIP(payload_type=2))) 47 48assert p.protocol_version == 0x02 49assert p.inverse_version == 0xfd 50assert p.payload_length == 6 51assert p.payload_type == 2 52assert bytes(p.eid) == b"\x00" * 6 53 54= Build test 3 55 56p = DoIP(bytes(DoIP(payload_type=3))) 57 58assert p.protocol_version == 0x02 59assert p.inverse_version == 0xfd 60assert p.payload_length == 17 61assert p.payload_type == 3 62assert bytes(p.vin) == b"\x00" * 17 63 64= Build test 4 65 66p = DoIP(bytes(DoIP(payload_type=4))) 67 68assert p.protocol_version == 0x02 69assert p.inverse_version == 0xfd 70assert p.payload_length == 33 71assert p.payload_type == 4 72assert bytes(p.vin) == b"\x00" * 17 73assert p.logical_address == 0 74assert bytes(p.eid) == b"\x00" * 6 75assert bytes(p.gid) == b"\x00" * 6 76assert p.further_action == 0 77assert p.vin_gid_status == 0 78 79= Build test 5 80 81p = DoIP(bytes(DoIP(payload_type=5))) 82 83assert p.protocol_version == 0x02 84assert p.inverse_version == 0xfd 85assert p.payload_length == 7 86assert p.payload_type == 5 87assert p.source_address == 0 88assert p.activation_type == 0 89assert p.reserved_iso == 0 90assert p.reserved_oem == b"" 91 92= Build test 5.1 93 94p = DoIP(bytes(DoIP(payload_type=5, reserved_oem=b"1234"))) 95 96assert p.protocol_version == 0x02 97assert p.inverse_version == 0xfd 98assert p.payload_length == 11 99assert p.payload_type == 5 100assert p.source_address == 0 101assert p.activation_type == 0 102assert p.reserved_iso == 0 103p.show() 104print(p.reserved_oem) 105assert p.reserved_oem == b"1234" 106 107= Build test 5.2 108 109p = DoIP(bytes(DoIP(payload_type=5, reserved_oem=b"12"))) 110 111assert p.protocol_version == 0x02 112assert p.inverse_version == 0xfd 113assert p.payload_length == 9 114assert p.payload_type == 5 115assert p.source_address == 0 116assert p.activation_type == 0 117assert p.reserved_iso == 0 118assert p.reserved_oem == b"12" 119 120= Build test 6 121 122p = DoIP(bytes(DoIP(payload_type=6))) 123 124assert p.protocol_version == 0x02 125assert p.inverse_version == 0xfd 126assert p.payload_length == 9 127assert p.payload_type == 6 128assert p.logical_address_tester == 0 129assert p.logical_address_doip_entity == 0 130assert p.reserved_iso == 0 131assert p.reserved_oem == b"" 132 133= Build test 7 134 135p = DoIP(bytes(DoIP(payload_type=7))) 136 137assert p.protocol_version == 0x02 138assert p.inverse_version == 0xfd 139assert p.payload_length == 0 140assert p.payload_type == 7 141 142= Build test 8 143 144p = DoIP(bytes(DoIP(payload_type=8))) 145 146assert p.protocol_version == 0x02 147assert p.inverse_version == 0xfd 148assert p.payload_length == 2 149assert p.payload_type == 8 150assert p.source_address == 0 151 152= Build test 4001 153 154p = DoIP(bytes(DoIP(payload_type=0x4001))) 155 156assert p.protocol_version == 0x02 157assert p.inverse_version == 0xfd 158assert p.payload_length == 0 159assert p.payload_type == 0x4001 160 161 162= Build test 4002 163 164p = DoIP(bytes(DoIP(payload_type=0x4002))) 165 166assert p.protocol_version == 0x02 167assert p.inverse_version == 0xfd 168assert p.payload_length == 7 169assert p.payload_type == 0x4002 170assert p.node_type == 0 171assert p.max_open_sockets == 1 172assert p.cur_open_sockets == 0 173assert p.max_data_size == 0 174 175 176= Build test 4003 177 178p = DoIP(bytes(DoIP(payload_type=0x4003))) 179 180assert p.protocol_version == 0x02 181assert p.inverse_version == 0xfd 182assert p.payload_length == 0 183assert p.payload_type == 0x4003 184 185 186= Build test 4004 187 188p = DoIP(bytes(DoIP(payload_type=0x4004))) 189 190assert p.protocol_version == 0x02 191assert p.inverse_version == 0xfd 192assert p.payload_length == 1 193assert p.payload_type == 0x4004 194assert p.diagnostic_power_mode == 0 195 196= Build test 8001 197 198p = DoIP(bytes(DoIP(payload_type=0x8001))) 199 200assert p.protocol_version == 0x02 201assert p.inverse_version == 0xfd 202assert p.payload_length == 4 203assert p.payload_type == 0x8001 204assert p.source_address == 0 205assert p.target_address == 0 206 207= Build test 8002 208 209p = DoIP(bytes(DoIP(payload_type=0x8002))) 210 211assert p.protocol_version == 0x02 212assert p.inverse_version == 0xfd 213assert p.payload_length == 5 214assert p.payload_type == 0x8002 215assert p.source_address == 0 216assert p.target_address == 0 217assert p.ack_code == 0 218assert p.previous_msg == b'' 219 220p = DoIP(bytes(DoIP(payload_type=0x8002, previous_msg=b'\x22\xfd\x32'))) 221 222assert p.protocol_version == 0x02 223assert p.inverse_version == 0xfd 224assert p.payload_length == 8 225assert p.payload_type == 0x8002 226assert p.source_address == 0 227assert p.target_address == 0 228assert p.ack_code == 0 229assert p.previous_msg == b'\x22\xfd\x32' 230 231p = DoIP(bytes(DoIP(payload_type=0x8002, previous_msg=b'\x19\x02\x09\x9C\x00'))) 232 233assert p.protocol_version == 0x02 234assert p.inverse_version == 0xfd 235assert p.payload_length == 10 236assert p.payload_type == 0x8002 237assert p.source_address == 0 238assert p.target_address == 0 239assert p.ack_code == 0 240assert p.previous_msg == b'\x19\x02\t\x9c\x00' 241 242p = DoIP(b'\x02\xfd\x80\x02\x00\x00\x00\x07\x00\x08\x00\x0e\x00\x10\x01') 243assert p.protocol_version == 0x02 244assert p.inverse_version == 0xFD 245assert p.payload_length == 7 246assert p.payload_type == 0x8002 247assert p.source_address == 0x8 248assert p.target_address == 0xE 249assert p.ack_code == 0 250assert p.previous_msg == b'\x10\x01' 251 252= Build test 8003 253 254p = DoIP(bytes(DoIP(payload_type=0x8003))) 255 256assert p.protocol_version == 0x02 257assert p.inverse_version == 0xfd 258assert p.payload_length == 5 259assert p.payload_type == 0x8003 260assert p.source_address == 0 261assert p.target_address == 0 262assert p.nack_code == 0 263 264 265p = DoIP(bytes(DoIP(payload_type=0x8003, previous_msg=b'\x2E\xfd\x32\x01\x02'))) 266 267assert p.protocol_version == 0x02 268assert p.inverse_version == 0xfd 269assert p.payload_length == 10 270assert p.payload_type == 0x8003 271assert p.source_address == 0 272assert p.target_address == 0 273assert p.nack_code == 0 274assert p.previous_msg == b'.\xfd2\x01\x02' 275 276p = DoIP(bytes(DoIP(payload_type=0x8003, previous_msg=b'\x19\x02\x09\x9A\x00'))) 277 278assert p.protocol_version == 0x02 279assert p.inverse_version == 0xfd 280assert p.payload_length == 10 281assert p.payload_type == 0x8003 282assert p.source_address == 0 283assert p.target_address == 0 284assert p.nack_code == 0 285assert p.previous_msg == b'\x19\x02\t\x9a\x00' 286 287p = DoIP(b'\x02\xfd\x80\x03\x00\x00\x00\x07\x00\x0A\x00\x0C\x00\x10\x03') 288assert p.protocol_version == 0x02 289assert p.inverse_version == 0xFD 290assert p.payload_length == 7 291assert p.payload_type == 0x8003 292assert p.source_address == 0xA 293assert p.target_address == 0xC 294assert p.nack_code == 0 295assert p.previous_msg == b'\x10\x03' 296 297+ pcap based tests 298 299= read diag_ack pcap file 300pkt = rdpcap(scapy_path("test/pcaps/doip_ack.pcap")).res[0] 301 302assert len(pkt) == 70 303 304= dissect test of diag ACK with previous_msg field filled 305assert pkt.protocol_version == 0x02 306assert pkt.inverse_version == 0xFD 307assert pkt.payload_length == 8 308assert pkt.source_address == 0x4B 309assert pkt.target_address == 0xE00 310assert pkt.ack_code == 0 311assert pkt.previous_msg == b'\x22\xFD\x31' 312 313 314= read main pcap file 315 316pkts = rdpcap(scapy_path("test/pcaps/doip.pcap.gz")) 317ips = [p for p in pkts if p.proto == 6] 318 319assert len(ips) > 1 320 321= dissect test of routing activation pkts req 322 323req = ips[0] 324p = req 325assert p.protocol_version == 0x02 326assert p.inverse_version == 0xfd 327assert p.payload_length == 11 328assert p.payload_type == 0x5 329assert p.source_address == 0xe80 330assert p.activation_type == 0 331assert p.reserved_iso == 0 332assert p.reserved_oem == b"\x00\x00\x00\x00" 333 334= dissect test of routing activation pkts resp 335 336resp = ips[1] 337p = resp 338assert p.protocol_version == 0x02 339assert p.inverse_version == 0xfd 340assert p.payload_length == 9 341assert p.payload_type == 0x6 342assert p.logical_address_tester == 0xe80 343assert p.logical_address_doip_entity == 0x4010 344assert p.routing_activation_response == 16 345assert p.reserved_iso == 0 346 347= answers test of routing activation pkts 348 349assert resp.answers(req) 350assert resp.hashret() == req.hashret() 351 352= dissect diagnostic message 353 354req = ips[-4] 355resp = ips[-1] 356 357p = req 358assert p.protocol_version == 0x02 359assert p.inverse_version == 0xfd 360assert p.payload_length == 6 361assert p.payload_type == 0x8001 362assert p.source_address == 0xe80 363assert p.target_address == 0x4010 364assert bytes(p)[-2:] == bytes(UDS()/UDS_DSC(b"\x02")) 365assert p.service == 0x10 366assert p.diagnosticSessionType == 2 367 368p = resp 369assert p.protocol_version == 0x02 370assert p.inverse_version == 0xfd 371assert p.payload_length == 10 372assert p.payload_type == 0x8001 373assert p.target_address == 0xe80 374assert p.source_address == 0x4010 375assert bytes(p)[-6:] == bytes(UDS()/UDS_DSCPR(b"\x02\x002\x01\xf4")) 376assert p.service == 0x50 377assert p.diagnosticSessionType == 2 378 379assert req.hashret() == resp.hashret() 380# exclude TCP layer from answers check 381assert resp[3].answers(req[3]) 382assert not req[3].answers(resp[3]) 383 384= TCPSession Test 385 386tmp_file = get_temp_file() 387 388wrpcap(tmp_file, [ 389 IP(src="10.10.10.10", dst="10.10.10.11") / TCP(sport=61000, seq=1) / DoIP(payload_type=0x8001, payload_length=6) / b"\x3E", 390 IP(src="10.10.10.10", dst="10.10.10.11") / TCP(sport=61000, dport=13400, seq=14) / Raw(load=b"\xff") 391]) 392 393pkts = sniff(offline=tmp_file, session=TCPSession) 394assert pkts[0].haslayer(UDS_TP) 395assert pkts[0].service == 0x3E 396 397= TCPSession Test multiple DoIP messages 398 399filename = scapy_path("/test/pcaps/multiple_doip_layers.pcap.gz") 400 401pkts = sniff(offline=filename, session=TCPSession) 402print(repr(pkts[0])) 403print(repr(pkts[1])) 404assert len(pkts) == 2 405assert pkts[0][DoIP].payload_length == 2 406assert pkts[0][DoIP:2].payload_length == 7 407assert pkts[1][DoIP].payload_length == 103 408 409+ DoIP Communication tests 410 411= Load libraries 412import base64 413import ssl 414import tempfile 415 416= Test DoIPSocket 417 418server_up = threading.Event() 419sniff_up = threading.Event() 420def server(): 421 buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' 422 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 423 try: 424 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 425 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 426 sock.bind(('127.0.0.1', 13400)) 427 sock.listen(1) 428 server_up.set() 429 connection, address = sock.accept() 430 sniff_up.wait(timeout=1) 431 connection.send(buffer) 432 connection.close() 433 finally: 434 sock.close() 435 436 437server_thread = threading.Thread(target=server) 438server_thread.start() 439server_up.wait(timeout=1) 440sock = DoIPSocket(activate_routing=False) 441 442pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) 443server_thread.join(timeout=1) 444assert len(pkts) == 2 445 446 447= Test DoIPSocket 2 448~ linux 449 450server_up = threading.Event() 451sniff_up = threading.Event() 452def server(): 453 buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' 454 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 455 try: 456 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 457 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 458 sock.bind(('127.0.0.1', 13400)) 459 sock.listen(1) 460 server_up.set() 461 connection, address = sock.accept() 462 sniff_up.wait(timeout=1) 463 for i in range(len(buffer)): 464 connection.send(buffer[i:i+1]) 465 time.sleep(0.01) 466 connection.close() 467 finally: 468 sock.close() 469 470 471server_thread = threading.Thread(target=server) 472server_thread.start() 473server_up.wait(timeout=1) 474sock = DoIPSocket(activate_routing=False) 475 476pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) 477server_thread.join(timeout=1) 478assert len(pkts) == 2 479 480= Test DoIPSocket 3 481 482server_up = threading.Event() 483sniff_up = threading.Event() 484def server(): 485 buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' 486 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 487 try: 488 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 489 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 490 sock.bind(('127.0.0.1', 13400)) 491 sock.listen(1) 492 server_up.set() 493 connection, address = sock.accept() 494 sniff_up.wait(timeout=1) 495 while buffer: 496 randlen = random.randint(0, len(buffer)) 497 connection.send(buffer[:randlen]) 498 buffer = buffer[randlen:] 499 time.sleep(0.01) 500 connection.close() 501 finally: 502 sock.close() 503 504 505server_thread = threading.Thread(target=server) 506server_thread.start() 507server_up.wait(timeout=1) 508sock = DoIPSocket(activate_routing=False) 509 510pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) 511server_thread.join(timeout=1) 512assert len(pkts) == 2 513 514 515= Test DoIPSocket6 516 517server_up = threading.Event() 518sniff_up = threading.Event() 519def server(): 520 buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' 521 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 522 try: 523 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 524 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 525 sock.bind(('::1', 13400)) 526 sock.listen(1) 527 server_up.set() 528 connection, address = sock.accept() 529 sniff_up.wait(timeout=1) 530 connection.send(buffer) 531 connection.close() 532 finally: 533 sock.close() 534 535 536server_thread = threading.Thread(target=server) 537server_thread.start() 538server_up.wait(timeout=1) 539sock = DoIPSocket(ip="::1", activate_routing=False) 540 541pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) 542server_thread.join(timeout=1) 543assert len(pkts) == 2 544 545= Test DoIPSslSocket 546~ broken_windows 547 548certstring = """ 549LS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tCk1JSUV2QUlCQURBTkJna3Foa2lHOXcwQkFRRUZB 550QVNDQktZd2dnU2lBZ0VBQW9JQkFRRFUvK0hRbVpzSDl2QVcKQ3ZMQjRxalpnZFJSSXE1b2JBanB4 551YUhoUGxCVEMvUlBzMHIxRVF0V0FtbXNEZFE3UGlLaCtYa1hES3pNY3lJSQp1a0ZpNThUQW1idGFj 552N0U5VmJHSnNlTWp2RkJKSkFqQXVtbFdRZk5XcSs2TkZhdmRkTDQrSTNBTVJ5TldJTkJYCjhHMzRo 553dldIbDdTOGhhSFFZN0FXcUZWVTNVL2xKR2pubnF3MEJraEIvVGRCTWIwM0habzkrVjIrWU9RZmk5 554QWsKTVRSRXpSeWVObWJqT0sxbHpXdFJXWkZZU0RnMEtqUVh4SkdFNVc5MzFPWitHL1NkbytTM1ZW 555SVRPdWxQbHRmVwpXMEdjeCsvZERSNFIxNG5mcUl5L1daMElHUVNXMlRsQytmeGJ0dURDUkFqelRz 556b0J3YjJ0cnpoR0VtYVFveUtNCnpBKzVSUHNyQWdNQkFBRUNnZ0VBRUJHaEoyWm5OVHh5YVY5TnZY 557QjI1NDNZQnRUMGVSUHBhanJLMXg0bk1OU3oKNE9LNFVzWlo1MnBnTHRHT1EzZm1aS0l0cEo1WlY1 558cVBUejdwN3VjUzhnQWNZUnNJUnpCMHA5d3FpWExMK3h0RApxUjB4dnR4VDJpUGlFblVNNndudHpr 559SHpKK0g0QkZLT2FvdjNaK3Fha2E1UmFCcmhheGRuaDBDNklLQmZtM3cyCm5zUWI2N0lCYWwrSnBs 560L1g5TENWRkdRT2owb0lmVWI5ZFp3OWQ3MCthSGVVb2xvMGdYZmxxcXFFcnl3ZDlPN2QKNnp4dGlx 561cnRyZUJhK1IraWs3NE1SK0xvaFNVR3o2VTRQaXhWQ3l1SnQ2U0hvRHR2L3dtSnltWDd2a0FRS2w1 562RQplK1JqUGVyakpUWTNzNXNXbEd2V21UTEtEbnVyS2pBYzZUOHhKb0pXWlFLQmdRRHdsd2RRdmww 563S28wNHhDUmtiCklYRGVJZE1jZkp2ejRGZEtka1BmVnZVT2xHVEpNZkRzbWNoUzZhcEJCQUdQMUU2 564VkN2VzJmUFdjaXhScHE3MW8KR2xtbWZ5RnlJRW0rL08yamMvSFRXWHp6Qjdoc0JISEltQklHczFU 565TC9iWFU3amhVQW5kWDdMK3RSRDBKNWRGVwpiN1VOOXNxaWdtRG42REJWZkxaUHgxRnlWUUtCZ1FE 566aXBIT1BhNmVMSlk5R1FZdkw3OTIyTHNoU3ZYSUFVMERGCjBabTlqbjM2b3ZIY0kvWEZDdHVXank2 567WG9wbk9pbjlycmtUY2FDUnBvSEFNb00ycHdiR0tFY0dVVEY2RHQ3akYKRHVnd2srR21sbDkrbjM2 568M3Iwb09YNktSbWFhRStiZHoyNjNQVEhMaktYUnFyc3h5WEtMT3ZyTXhVNWNzMXJCeQpTMWI2ZGhr 569M2Z3S0JnRjlONUliMnNkS3ArQ3B5aVRCM0ljZk1yRjBuZTN1ekRjRWdjaWlCd05lQ3J4NElHNEVP 570Ck5nMnFKRmhXNXV0NzFaa3kyenpyNlR1VzJJSTNsdk1ySlFKUWNBWk9oZ2dURjJ2ZFhSazA1TXM4 571N3JCVFhtTncKNGdzbmROck42UDZ0VTBEc0xTeDJTME91dVdNM1Y2S2U0NkRoZDBuQ3pmSnZ4dDNH 572WmszYURnaDFBb0dBWFhIcQpoNDZlZEx1V3VDUGNUTWhvUkc1RGdBSEdHQ1k3UlpTbTY4WHRZVUov 573c0FGUG10OWdMRko2cG1DUFE5NU1yUXdjCkxqZnVFM0xuMy8wSTd0NENvbWV4eGNBN0U5blRIOFNH 574clVpN3QrQzJITklNQUJZUTFaNU91L042K2Nhd0FkL28KYU5rZllWTzlRU015L2svOWZIcWFEVk5t 575dUVFSVhRZDlKQ1UvUG1jQ2dZQWI0RTBRWTdDZmlrV293OFIzSlhoZgo0MHFVVkdud09QKzJNbXE5 576d2ZmWkpTRHNFSTQvb2g0VGRnN0sybHNNazVsWnRaMyszTjljSDVUc1pMYlJtd2FMCm9sRVl6K1BB 577WU91MlMrY1l2bFlNL0V2WmlpRHJybjZuTStNbTNnaXJPYkNwMzcxd1ZxRFVsUnB4OUlwWVdYcnAK 578T3YxUXFHdXkwODdyQkk1cStWL3hqQT09Ci0tLS0tRU5EIFBSSVZBVEUgS0VZLS0tLS0KLS0tLS1C 579RUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUQ3VENDQXRXZ0F3SUJBZ0lVVTNsendsTVNSa294Tkdk 580SFJzZllIcUtxcDAwd0RRWUpLb1pJaHZjTkFRRUwKQlFBd2dZVXhDekFKQmdOVkJBWVRBa1JGTVJN 581d0VRWURWUVFJREFwVGIyMWxMVk4wWVhSbE1Rd3dDZ1lEVlFRSApEQU5TUlVjeEVUQVBCZ05WQkFv 582TUNHUnBjM05sWTNSdk1Rd3dDZ1lEVlFRTERBTkVSVll4RFRBTEJnTlZCQU1NCkJGUkZVMVF4SXpB 583aEJna3Foa2lHOXcwQkNRRVdGR052Ym5SaFkzUXRkWE5BWkdsemMyVmpMblJ2TUI0WERUSTAKTURN 584eE9ERTVNek13TlZvWERUSTBNRFF4TnpFNU16TXdOVm93Z1lVeEN6QUpCZ05WQkFZVEFrUkZNUk13 585RVFZRApWUVFJREFwVGIyMWxMVk4wWVhSbE1Rd3dDZ1lEVlFRSERBTlNSVWN4RVRBUEJnTlZCQW9N 586Q0dScGMzTmxZM1J2Ck1Rd3dDZ1lEVlFRTERBTkVSVll4RFRBTEJnTlZCQU1NQkZSRlUxUXhJekFo 587QmdrcWhraUc5dzBCQ1FFV0ZHTnYKYm5SaFkzUXRkWE5BWkdsemMyVmpMblJ2TUlJQklqQU5CZ2tx 588aGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQwpBUUVBMVAvaDBKbWJCL2J3RmdyeXdlS28yWUhV 589VVNLdWFHd0k2Y1doNFQ1UVV3djBUN05LOVJFTFZnSnByQTNVCk96NGlvZmw1Rnd5c3pITWlDTHBC 590WXVmRXdKbTdXbk94UFZXeGliSGpJN3hRU1NRSXdMcHBWa0h6VnF2dWpSV3IKM1hTK1BpTndERWNq 591VmlEUVYvQnQrSWIxaDVlMHZJV2gwR093RnFoVlZOMVA1U1JvNTU2c05BWklRZjAzUVRHOQpOeDJh 592UGZsZHZtRGtINHZRSkRFMFJNMGNualptNHppdFpjMXJVVm1SV0VnNE5DbzBGOFNSaE9WdmQ5VG1m 593aHYwCm5hUGt0MVZTRXpycFQ1YlgxbHRCbk1mdjNRMGVFZGVKMzZpTXYxbWRDQmtFbHRrNVF2bjhX 594N2Jnd2tRSTgwN0sKQWNHOXJhODRSaEpta0tNaWpNd1B1VVQ3S3dJREFRQUJvMU13VVRBZEJnTlZI 595UTRFRmdRVVZhbUFkUjR1ZW8zQgpmV0RjUlMyUkQ3OEtlZXd3SHdZRFZSMGpCQmd3Rm9BVVZhbUFk 596UjR1ZW8zQmZXRGNSUzJSRDc4S2Vld3dEd1lEClZSMFRBUUgvQkFVd0F3RUIvekFOQmdrcWhraUc5 597dzBCQVFzRkFBT0NBUUVBRjE1TTNvL3RyUVdYeHdHamlxZjgKNXBUTEM0bHJwQkZaTFZDbStQdHd4 598aENlN1ZSd2dLMElBb01EMW0vSjNEYnVJSjVURXlTVElnR2N0WHVNbG5pWgpsY3IwekZOZVVhQ08w 599YkdhaExYUXpCWTRxSkhTTUNWNnhiNXNqUDlEdk9HYnFxbHVTbk51ZFJ5UWNIbkd4SE0rCk1adXpO 600WUNseklOMEtYbFJuSTZqRXUrcG9XZ0pEMGN1NFM2b1lwT2R3bElRYmtaNnIrUE1jQ3hpRmhRd3E2 601em4KcE1nQzB0WlpSM3pCOEpVcTJwRHlGVy9jVlFjWkp5YUhnQkkwWlJWWG5wbDFqYng2YlNIOCts 602cnMxVk1xZDlkcQozd1BMcjBheWI2VkpNa29WMjNWSXAzLzlYQVpTR3Z6Y0dadnM2VThSUTdFbUtx 603akJibWxudm1CTkpUMk9xbFFRCllRPT0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=""" 604 605certstring = certstring.replace('\n', '') 606 607def _load_certificate_chain(context) -> None: 608 with tempfile.NamedTemporaryFile(delete=False) as fp: 609 fp.write(base64.b64decode(certstring)) 610 fp.close() 611 context.load_cert_chain(fp.name) 612 613 614server_up = threading.Event() 615sniff_up = threading.Event() 616def server(): 617 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 618 _load_certificate_chain(context) 619 context.check_hostname = False 620 context.verify_mode = ssl.CERT_NONE 621 buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' 622 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 623 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 624 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 625 ssock = context.wrap_socket(sock) 626 try: 627 ssock.bind(('127.0.0.1', 3496)) 628 ssock.listen(1) 629 server_up.set() 630 connection, address = ssock.accept() 631 sniff_up.wait(timeout=1) 632 connection.send(buffer) 633 connection.close() 634 finally: 635 ssock.close() 636 637 638server_thread = threading.Thread(target=server) 639server_thread.start() 640server_up.wait(timeout=1) 641context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 642context.check_hostname = False 643context.verify_mode = ssl.CERT_NONE 644sock = DoIPSocket(activate_routing=False, force_tls=True, context=context) 645 646pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) 647server_thread.join(timeout=1) 648assert len(pkts) == 2 649 650= Test DoIPSslSocket6 651~ broken_windows 652 653server_up = threading.Event() 654sniff_up = threading.Event() 655def server(): 656 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 657 _load_certificate_chain(context) 658 context.check_hostname = False 659 context.verify_mode = ssl.CERT_NONE 660 buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' 661 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 662 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 663 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 664 ssock = context.wrap_socket(sock) 665 try: 666 ssock.bind(('::1', 3496)) 667 ssock.listen(1) 668 server_up.set() 669 connection, address = ssock.accept() 670 sniff_up.wait(timeout=1) 671 connection.send(buffer) 672 connection.close() 673 finally: 674 ssock.close() 675 676 677server_thread = threading.Thread(target=server) 678server_thread.start() 679server_up.wait(timeout=1) 680context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 681context.check_hostname = False 682context.verify_mode = ssl.CERT_NONE 683sock = DoIPSocket(ip="::1", activate_routing=False, force_tls=True, context=context) 684 685pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) 686server_thread.join(timeout=1) 687assert len(pkts) == 2 688 689= Test UDS_DoIPSslSocket6 690~ broken_windows 691 692server_up = threading.Event() 693sniff_up = threading.Event() 694def server(): 695 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 696 _load_certificate_chain(context) 697 context.check_hostname = False 698 context.verify_mode = ssl.CERT_NONE 699 buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' 700 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 701 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 702 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 703 ssock = context.wrap_socket(sock) 704 try: 705 ssock.bind(('::1', 3496)) 706 ssock.listen(1) 707 server_up.set() 708 connection, address = ssock.accept() 709 sniff_up.wait(timeout=1) 710 connection.send(buffer) 711 connection.close() 712 finally: 713 ssock.close() 714 715 716server_thread = threading.Thread(target=server) 717server_thread.start() 718server_up.wait(timeout=1) 719context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 720context.check_hostname = False 721context.verify_mode = ssl.CERT_NONE 722sock = UDS_DoIPSocket(ip="::1", activate_routing=False, force_tls=True, context=context) 723 724pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) 725server_thread.join(timeout=1) 726assert len(pkts) == 2 727 728= Test UDS_DualDoIPSslSocket6 729~ broken_windows not_pypy 730 731server_tcp_up = threading.Event() 732server_tls_up = threading.Event() 733sniff_up = threading.Event() 734def server_tls(): 735 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 736 _load_certificate_chain(context) 737 context.check_hostname = False 738 context.verify_mode = ssl.CERT_NONE 739 buffer = bytes.fromhex("02fd0006000000090e8011061000000000") 740 buffer += b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' 741 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 742 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 743 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 744 ssock = context.wrap_socket(sock) 745 try: 746 ssock.bind(('::1', 3496)) 747 ssock.listen(1) 748 server_tls_up.set() 749 connection, address = ssock.accept() 750 sniff_up.wait(timeout=1) 751 connection.send(buffer) 752 connection.close() 753 finally: 754 ssock.close() 755 756def server_tcp(): 757 buffer = bytes.fromhex("02fd0006000000090e8011060700000000") 758 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 759 try: 760 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 761 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 762 sock.bind(('::1', 13400)) 763 sock.listen(1) 764 server_tcp_up.set() 765 connection, address = sock.accept() 766 connection.send(buffer) 767 connection.shutdown(socket.SHUT_RDWR) 768 connection.close() 769 finally: 770 sock.close() 771 772 773server_tcp_thread = threading.Thread(target=server_tcp) 774server_tcp_thread.start() 775server_tcp_up.wait(timeout=1) 776server_tls_thread = threading.Thread(target=server_tls) 777server_tls_thread.start() 778server_tls_up.wait(timeout=1) 779context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 780context.check_hostname = False 781context.verify_mode = ssl.CERT_NONE 782 783 784sock = UDS_DoIPSocket(ip="::1", context=context) 785 786pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) 787server_tcp_thread.join(timeout=1) 788server_tls_thread.join(timeout=1) 789assert len(pkts) == 2 790