• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1% Regression tests for the DoIP layer
2
3# More information at http://www.secdev.org/projects/UTscapy/
4
5
6############
7############
8
9+ Doip contrib tests
10
11= Load Contrib Layer
12
13load_contrib("automotive.doip", globals_dict=globals())
14load_contrib("automotive.uds", globals_dict=globals())
15
16= Defaults test
17
18p = DoIP(payload_type=1)
19
20assert p.protocol_version == 0x02
21assert p.inverse_version == 0xfd
22assert p.payload_length == None
23assert p.payload_type == 1
24
25= Build test 0
26
27p = DoIP(bytes(DoIP(payload_type=0)))
28
29assert p.protocol_version == 0x02
30assert p.inverse_version == 0xfd
31assert p.payload_length == 1
32assert p.payload_type == 0
33assert p.nack == 0
34
35= Build test 1
36
37p = DoIP(bytes(DoIP(payload_type=1)))
38
39assert p.protocol_version == 0x02
40assert p.inverse_version == 0xfd
41assert p.payload_length == 0
42assert p.payload_type == 1
43
44= Build test 2
45
46p = DoIP(bytes(DoIP(payload_type=2)))
47
48assert p.protocol_version == 0x02
49assert p.inverse_version == 0xfd
50assert p.payload_length == 6
51assert p.payload_type == 2
52assert bytes(p.eid) == b"\x00" * 6
53
54= Build test 3
55
56p = DoIP(bytes(DoIP(payload_type=3)))
57
58assert p.protocol_version == 0x02
59assert p.inverse_version == 0xfd
60assert p.payload_length == 17
61assert p.payload_type == 3
62assert bytes(p.vin) == b"\x00" * 17
63
64= Build test 4
65
66p = DoIP(bytes(DoIP(payload_type=4)))
67
68assert p.protocol_version == 0x02
69assert p.inverse_version == 0xfd
70assert p.payload_length == 33
71assert p.payload_type == 4
72assert bytes(p.vin) == b"\x00" * 17
73assert p.logical_address == 0
74assert bytes(p.eid) == b"\x00" * 6
75assert bytes(p.gid) == b"\x00" * 6
76assert p.further_action == 0
77assert p.vin_gid_status == 0
78
79= Build test 5
80
81p = DoIP(bytes(DoIP(payload_type=5)))
82
83assert p.protocol_version == 0x02
84assert p.inverse_version == 0xfd
85assert p.payload_length == 7
86assert p.payload_type == 5
87assert p.source_address == 0
88assert p.activation_type == 0
89assert p.reserved_iso == 0
90assert p.reserved_oem == b""
91
92= Build test 5.1
93
94p = DoIP(bytes(DoIP(payload_type=5, reserved_oem=b"1234")))
95
96assert p.protocol_version == 0x02
97assert p.inverse_version == 0xfd
98assert p.payload_length == 11
99assert p.payload_type == 5
100assert p.source_address == 0
101assert p.activation_type == 0
102assert p.reserved_iso == 0
103p.show()
104print(p.reserved_oem)
105assert p.reserved_oem == b"1234"
106
107= Build test 5.2
108
109p = DoIP(bytes(DoIP(payload_type=5, reserved_oem=b"12")))
110
111assert p.protocol_version == 0x02
112assert p.inverse_version == 0xfd
113assert p.payload_length == 9
114assert p.payload_type == 5
115assert p.source_address == 0
116assert p.activation_type == 0
117assert p.reserved_iso == 0
118assert p.reserved_oem == b"12"
119
120= Build test 6
121
122p = DoIP(bytes(DoIP(payload_type=6)))
123
124assert p.protocol_version == 0x02
125assert p.inverse_version == 0xfd
126assert p.payload_length == 9
127assert p.payload_type == 6
128assert p.logical_address_tester == 0
129assert p.logical_address_doip_entity == 0
130assert p.reserved_iso == 0
131assert p.reserved_oem == b""
132
133= Build test 7
134
135p = DoIP(bytes(DoIP(payload_type=7)))
136
137assert p.protocol_version == 0x02
138assert p.inverse_version == 0xfd
139assert p.payload_length == 0
140assert p.payload_type == 7
141
142= Build test 8
143
144p = DoIP(bytes(DoIP(payload_type=8)))
145
146assert p.protocol_version == 0x02
147assert p.inverse_version == 0xfd
148assert p.payload_length == 2
149assert p.payload_type == 8
150assert p.source_address == 0
151
152= Build test 4001
153
154p = DoIP(bytes(DoIP(payload_type=0x4001)))
155
156assert p.protocol_version == 0x02
157assert p.inverse_version == 0xfd
158assert p.payload_length == 0
159assert p.payload_type == 0x4001
160
161
162= Build test 4002
163
164p = DoIP(bytes(DoIP(payload_type=0x4002)))
165
166assert p.protocol_version == 0x02
167assert p.inverse_version == 0xfd
168assert p.payload_length == 7
169assert p.payload_type == 0x4002
170assert p.node_type == 0
171assert p.max_open_sockets == 1
172assert p.cur_open_sockets == 0
173assert p.max_data_size == 0
174
175
176= Build test 4003
177
178p = DoIP(bytes(DoIP(payload_type=0x4003)))
179
180assert p.protocol_version == 0x02
181assert p.inverse_version == 0xfd
182assert p.payload_length == 0
183assert p.payload_type == 0x4003
184
185
186= Build test 4004
187
188p = DoIP(bytes(DoIP(payload_type=0x4004)))
189
190assert p.protocol_version == 0x02
191assert p.inverse_version == 0xfd
192assert p.payload_length == 1
193assert p.payload_type == 0x4004
194assert p.diagnostic_power_mode == 0
195
196= Build test 8001
197
198p = DoIP(bytes(DoIP(payload_type=0x8001)))
199
200assert p.protocol_version == 0x02
201assert p.inverse_version == 0xfd
202assert p.payload_length == 4
203assert p.payload_type == 0x8001
204assert p.source_address == 0
205assert p.target_address == 0
206
207= Build test 8002
208
209p = DoIP(bytes(DoIP(payload_type=0x8002)))
210
211assert p.protocol_version == 0x02
212assert p.inverse_version == 0xfd
213assert p.payload_length == 5
214assert p.payload_type == 0x8002
215assert p.source_address == 0
216assert p.target_address == 0
217assert p.ack_code == 0
218assert p.previous_msg == b''
219
220p = DoIP(bytes(DoIP(payload_type=0x8002, previous_msg=b'\x22\xfd\x32')))
221
222assert p.protocol_version == 0x02
223assert p.inverse_version == 0xfd
224assert p.payload_length == 8
225assert p.payload_type == 0x8002
226assert p.source_address == 0
227assert p.target_address == 0
228assert p.ack_code == 0
229assert p.previous_msg == b'\x22\xfd\x32'
230
231p = DoIP(bytes(DoIP(payload_type=0x8002, previous_msg=b'\x19\x02\x09\x9C\x00')))
232
233assert p.protocol_version == 0x02
234assert p.inverse_version == 0xfd
235assert p.payload_length == 10
236assert p.payload_type == 0x8002
237assert p.source_address == 0
238assert p.target_address == 0
239assert p.ack_code == 0
240assert p.previous_msg == b'\x19\x02\t\x9c\x00'
241
242p = DoIP(b'\x02\xfd\x80\x02\x00\x00\x00\x07\x00\x08\x00\x0e\x00\x10\x01')
243assert p.protocol_version == 0x02
244assert p.inverse_version == 0xFD
245assert p.payload_length == 7
246assert p.payload_type == 0x8002
247assert p.source_address == 0x8
248assert p.target_address == 0xE
249assert p.ack_code == 0
250assert p.previous_msg == b'\x10\x01'
251
252= Build test 8003
253
254p = DoIP(bytes(DoIP(payload_type=0x8003)))
255
256assert p.protocol_version == 0x02
257assert p.inverse_version == 0xfd
258assert p.payload_length == 5
259assert p.payload_type == 0x8003
260assert p.source_address == 0
261assert p.target_address == 0
262assert p.nack_code == 0
263
264
265p = DoIP(bytes(DoIP(payload_type=0x8003, previous_msg=b'\x2E\xfd\x32\x01\x02')))
266
267assert p.protocol_version == 0x02
268assert p.inverse_version == 0xfd
269assert p.payload_length == 10
270assert p.payload_type == 0x8003
271assert p.source_address == 0
272assert p.target_address == 0
273assert p.nack_code == 0
274assert p.previous_msg == b'.\xfd2\x01\x02'
275
276p = DoIP(bytes(DoIP(payload_type=0x8003, previous_msg=b'\x19\x02\x09\x9A\x00')))
277
278assert p.protocol_version == 0x02
279assert p.inverse_version == 0xfd
280assert p.payload_length == 10
281assert p.payload_type == 0x8003
282assert p.source_address == 0
283assert p.target_address == 0
284assert p.nack_code == 0
285assert p.previous_msg == b'\x19\x02\t\x9a\x00'
286
287p = DoIP(b'\x02\xfd\x80\x03\x00\x00\x00\x07\x00\x0A\x00\x0C\x00\x10\x03')
288assert p.protocol_version == 0x02
289assert p.inverse_version == 0xFD
290assert p.payload_length == 7
291assert p.payload_type == 0x8003
292assert p.source_address == 0xA
293assert p.target_address == 0xC
294assert p.nack_code == 0
295assert p.previous_msg == b'\x10\x03'
296
297+ pcap based tests
298
299= read diag_ack pcap file
300pkt = rdpcap(scapy_path("test/pcaps/doip_ack.pcap")).res[0]
301
302assert len(pkt) == 70
303
304= dissect test of diag ACK with previous_msg field filled
305assert pkt.protocol_version == 0x02
306assert pkt.inverse_version == 0xFD
307assert pkt.payload_length == 8
308assert pkt.source_address == 0x4B
309assert pkt.target_address == 0xE00
310assert pkt.ack_code == 0
311assert pkt.previous_msg == b'\x22\xFD\x31'
312
313
314= read main pcap file
315
316pkts = rdpcap(scapy_path("test/pcaps/doip.pcap.gz"))
317ips = [p for p in pkts if p.proto == 6]
318
319assert len(ips) > 1
320
321= dissect test of routing activation pkts req
322
323req = ips[0]
324p = req
325assert p.protocol_version == 0x02
326assert p.inverse_version == 0xfd
327assert p.payload_length == 11
328assert p.payload_type == 0x5
329assert p.source_address == 0xe80
330assert p.activation_type == 0
331assert p.reserved_iso == 0
332assert p.reserved_oem == b"\x00\x00\x00\x00"
333
334= dissect test of routing activation pkts resp
335
336resp = ips[1]
337p = resp
338assert p.protocol_version == 0x02
339assert p.inverse_version == 0xfd
340assert p.payload_length == 9
341assert p.payload_type == 0x6
342assert p.logical_address_tester == 0xe80
343assert p.logical_address_doip_entity == 0x4010
344assert p.routing_activation_response == 16
345assert p.reserved_iso == 0
346
347= answers test of routing activation pkts
348
349assert resp.answers(req)
350assert resp.hashret() == req.hashret()
351
352= dissect diagnostic message
353
354req = ips[-4]
355resp = ips[-1]
356
357p = req
358assert p.protocol_version == 0x02
359assert p.inverse_version == 0xfd
360assert p.payload_length == 6
361assert p.payload_type == 0x8001
362assert p.source_address == 0xe80
363assert p.target_address == 0x4010
364assert bytes(p)[-2:] == bytes(UDS()/UDS_DSC(b"\x02"))
365assert p.service == 0x10
366assert p.diagnosticSessionType == 2
367
368p = resp
369assert p.protocol_version == 0x02
370assert p.inverse_version == 0xfd
371assert p.payload_length == 10
372assert p.payload_type == 0x8001
373assert p.target_address == 0xe80
374assert p.source_address == 0x4010
375assert bytes(p)[-6:] == bytes(UDS()/UDS_DSCPR(b"\x02\x002\x01\xf4"))
376assert p.service == 0x50
377assert p.diagnosticSessionType == 2
378
379assert req.hashret() == resp.hashret()
380# exclude TCP layer from answers check
381assert resp[3].answers(req[3])
382assert not req[3].answers(resp[3])
383
384= TCPSession Test
385
386tmp_file = get_temp_file()
387
388wrpcap(tmp_file, [
389    IP(src="10.10.10.10", dst="10.10.10.11") / TCP(sport=61000, seq=1) / DoIP(payload_type=0x8001, payload_length=6) / b"\x3E",
390    IP(src="10.10.10.10", dst="10.10.10.11") / TCP(sport=61000, dport=13400, seq=14) / Raw(load=b"\xff")
391])
392
393pkts = sniff(offline=tmp_file, session=TCPSession)
394assert pkts[0].haslayer(UDS_TP)
395assert pkts[0].service == 0x3E
396
397= TCPSession Test multiple DoIP messages
398
399filename = scapy_path("/test/pcaps/multiple_doip_layers.pcap.gz")
400
401pkts = sniff(offline=filename, session=TCPSession)
402print(repr(pkts[0]))
403print(repr(pkts[1]))
404assert len(pkts) == 2
405assert pkts[0][DoIP].payload_length == 2
406assert pkts[0][DoIP:2].payload_length == 7
407assert pkts[1][DoIP].payload_length == 103
408
409+ DoIP Communication tests
410
411= Load libraries
412import base64
413import ssl
414import tempfile
415
416= Test DoIPSocket
417
418server_up = threading.Event()
419sniff_up = threading.Event()
420def server():
421    buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4'
422    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
423    try:
424        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
425        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
426        sock.bind(('127.0.0.1', 13400))
427        sock.listen(1)
428        server_up.set()
429        connection, address = sock.accept()
430        sniff_up.wait(timeout=1)
431        connection.send(buffer)
432        connection.close()
433    finally:
434        sock.close()
435
436
437server_thread = threading.Thread(target=server)
438server_thread.start()
439server_up.wait(timeout=1)
440sock = DoIPSocket(activate_routing=False)
441
442pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
443server_thread.join(timeout=1)
444assert len(pkts) == 2
445
446
447= Test DoIPSocket 2
448~ linux
449
450server_up = threading.Event()
451sniff_up = threading.Event()
452def server():
453    buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4'
454    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
455    try:
456        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
457        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
458        sock.bind(('127.0.0.1', 13400))
459        sock.listen(1)
460        server_up.set()
461        connection, address = sock.accept()
462        sniff_up.wait(timeout=1)
463        for i in range(len(buffer)):
464            connection.send(buffer[i:i+1])
465            time.sleep(0.01)
466        connection.close()
467    finally:
468        sock.close()
469
470
471server_thread = threading.Thread(target=server)
472server_thread.start()
473server_up.wait(timeout=1)
474sock = DoIPSocket(activate_routing=False)
475
476pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
477server_thread.join(timeout=1)
478assert len(pkts) == 2
479
480= Test DoIPSocket 3
481
482server_up = threading.Event()
483sniff_up = threading.Event()
484def server():
485    buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4'
486    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
487    try:
488        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
489        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
490        sock.bind(('127.0.0.1', 13400))
491        sock.listen(1)
492        server_up.set()
493        connection, address = sock.accept()
494        sniff_up.wait(timeout=1)
495        while buffer:
496            randlen = random.randint(0, len(buffer))
497            connection.send(buffer[:randlen])
498            buffer = buffer[randlen:]
499            time.sleep(0.01)
500        connection.close()
501    finally:
502        sock.close()
503
504
505server_thread = threading.Thread(target=server)
506server_thread.start()
507server_up.wait(timeout=1)
508sock = DoIPSocket(activate_routing=False)
509
510pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
511server_thread.join(timeout=1)
512assert len(pkts) == 2
513
514
515= Test DoIPSocket6
516
517server_up = threading.Event()
518sniff_up = threading.Event()
519def server():
520    buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4'
521    sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
522    try:
523        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
524        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
525        sock.bind(('::1', 13400))
526        sock.listen(1)
527        server_up.set()
528        connection, address = sock.accept()
529        sniff_up.wait(timeout=1)
530        connection.send(buffer)
531        connection.close()
532    finally:
533        sock.close()
534
535
536server_thread = threading.Thread(target=server)
537server_thread.start()
538server_up.wait(timeout=1)
539sock = DoIPSocket(ip="::1", activate_routing=False)
540
541pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
542server_thread.join(timeout=1)
543assert len(pkts) == 2
544
545= Test DoIPSslSocket
546~ broken_windows
547
548certstring = """
549LS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tCk1JSUV2QUlCQURBTkJna3Foa2lHOXcwQkFRRUZB
550QVNDQktZd2dnU2lBZ0VBQW9JQkFRRFUvK0hRbVpzSDl2QVcKQ3ZMQjRxalpnZFJSSXE1b2JBanB4
551YUhoUGxCVEMvUlBzMHIxRVF0V0FtbXNEZFE3UGlLaCtYa1hES3pNY3lJSQp1a0ZpNThUQW1idGFj
552N0U5VmJHSnNlTWp2RkJKSkFqQXVtbFdRZk5XcSs2TkZhdmRkTDQrSTNBTVJ5TldJTkJYCjhHMzRo
553dldIbDdTOGhhSFFZN0FXcUZWVTNVL2xKR2pubnF3MEJraEIvVGRCTWIwM0habzkrVjIrWU9RZmk5
554QWsKTVRSRXpSeWVObWJqT0sxbHpXdFJXWkZZU0RnMEtqUVh4SkdFNVc5MzFPWitHL1NkbytTM1ZW
555SVRPdWxQbHRmVwpXMEdjeCsvZERSNFIxNG5mcUl5L1daMElHUVNXMlRsQytmeGJ0dURDUkFqelRz
556b0J3YjJ0cnpoR0VtYVFveUtNCnpBKzVSUHNyQWdNQkFBRUNnZ0VBRUJHaEoyWm5OVHh5YVY5TnZY
557QjI1NDNZQnRUMGVSUHBhanJLMXg0bk1OU3oKNE9LNFVzWlo1MnBnTHRHT1EzZm1aS0l0cEo1WlY1
558cVBUejdwN3VjUzhnQWNZUnNJUnpCMHA5d3FpWExMK3h0RApxUjB4dnR4VDJpUGlFblVNNndudHpr
559SHpKK0g0QkZLT2FvdjNaK3Fha2E1UmFCcmhheGRuaDBDNklLQmZtM3cyCm5zUWI2N0lCYWwrSnBs
560L1g5TENWRkdRT2owb0lmVWI5ZFp3OWQ3MCthSGVVb2xvMGdYZmxxcXFFcnl3ZDlPN2QKNnp4dGlx
561cnRyZUJhK1IraWs3NE1SK0xvaFNVR3o2VTRQaXhWQ3l1SnQ2U0hvRHR2L3dtSnltWDd2a0FRS2w1
562RQplK1JqUGVyakpUWTNzNXNXbEd2V21UTEtEbnVyS2pBYzZUOHhKb0pXWlFLQmdRRHdsd2RRdmww
563S28wNHhDUmtiCklYRGVJZE1jZkp2ejRGZEtka1BmVnZVT2xHVEpNZkRzbWNoUzZhcEJCQUdQMUU2
564VkN2VzJmUFdjaXhScHE3MW8KR2xtbWZ5RnlJRW0rL08yamMvSFRXWHp6Qjdoc0JISEltQklHczFU
565TC9iWFU3amhVQW5kWDdMK3RSRDBKNWRGVwpiN1VOOXNxaWdtRG42REJWZkxaUHgxRnlWUUtCZ1FE
566aXBIT1BhNmVMSlk5R1FZdkw3OTIyTHNoU3ZYSUFVMERGCjBabTlqbjM2b3ZIY0kvWEZDdHVXank2
567WG9wbk9pbjlycmtUY2FDUnBvSEFNb00ycHdiR0tFY0dVVEY2RHQ3akYKRHVnd2srR21sbDkrbjM2
568M3Iwb09YNktSbWFhRStiZHoyNjNQVEhMaktYUnFyc3h5WEtMT3ZyTXhVNWNzMXJCeQpTMWI2ZGhr
569M2Z3S0JnRjlONUliMnNkS3ArQ3B5aVRCM0ljZk1yRjBuZTN1ekRjRWdjaWlCd05lQ3J4NElHNEVP
570Ck5nMnFKRmhXNXV0NzFaa3kyenpyNlR1VzJJSTNsdk1ySlFKUWNBWk9oZ2dURjJ2ZFhSazA1TXM4
571N3JCVFhtTncKNGdzbmROck42UDZ0VTBEc0xTeDJTME91dVdNM1Y2S2U0NkRoZDBuQ3pmSnZ4dDNH
572WmszYURnaDFBb0dBWFhIcQpoNDZlZEx1V3VDUGNUTWhvUkc1RGdBSEdHQ1k3UlpTbTY4WHRZVUov
573c0FGUG10OWdMRko2cG1DUFE5NU1yUXdjCkxqZnVFM0xuMy8wSTd0NENvbWV4eGNBN0U5blRIOFNH
574clVpN3QrQzJITklNQUJZUTFaNU91L042K2Nhd0FkL28KYU5rZllWTzlRU015L2svOWZIcWFEVk5t
575dUVFSVhRZDlKQ1UvUG1jQ2dZQWI0RTBRWTdDZmlrV293OFIzSlhoZgo0MHFVVkdud09QKzJNbXE5
576d2ZmWkpTRHNFSTQvb2g0VGRnN0sybHNNazVsWnRaMyszTjljSDVUc1pMYlJtd2FMCm9sRVl6K1BB
577WU91MlMrY1l2bFlNL0V2WmlpRHJybjZuTStNbTNnaXJPYkNwMzcxd1ZxRFVsUnB4OUlwWVdYcnAK
578T3YxUXFHdXkwODdyQkk1cStWL3hqQT09Ci0tLS0tRU5EIFBSSVZBVEUgS0VZLS0tLS0KLS0tLS1C
579RUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUQ3VENDQXRXZ0F3SUJBZ0lVVTNsendsTVNSa294Tkdk
580SFJzZllIcUtxcDAwd0RRWUpLb1pJaHZjTkFRRUwKQlFBd2dZVXhDekFKQmdOVkJBWVRBa1JGTVJN
581d0VRWURWUVFJREFwVGIyMWxMVk4wWVhSbE1Rd3dDZ1lEVlFRSApEQU5TUlVjeEVUQVBCZ05WQkFv
582TUNHUnBjM05sWTNSdk1Rd3dDZ1lEVlFRTERBTkVSVll4RFRBTEJnTlZCQU1NCkJGUkZVMVF4SXpB
583aEJna3Foa2lHOXcwQkNRRVdGR052Ym5SaFkzUXRkWE5BWkdsemMyVmpMblJ2TUI0WERUSTAKTURN
584eE9ERTVNek13TlZvWERUSTBNRFF4TnpFNU16TXdOVm93Z1lVeEN6QUpCZ05WQkFZVEFrUkZNUk13
585RVFZRApWUVFJREFwVGIyMWxMVk4wWVhSbE1Rd3dDZ1lEVlFRSERBTlNSVWN4RVRBUEJnTlZCQW9N
586Q0dScGMzTmxZM1J2Ck1Rd3dDZ1lEVlFRTERBTkVSVll4RFRBTEJnTlZCQU1NQkZSRlUxUXhJekFo
587QmdrcWhraUc5dzBCQ1FFV0ZHTnYKYm5SaFkzUXRkWE5BWkdsemMyVmpMblJ2TUlJQklqQU5CZ2tx
588aGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQwpBUUVBMVAvaDBKbWJCL2J3RmdyeXdlS28yWUhV
589VVNLdWFHd0k2Y1doNFQ1UVV3djBUN05LOVJFTFZnSnByQTNVCk96NGlvZmw1Rnd5c3pITWlDTHBC
590WXVmRXdKbTdXbk94UFZXeGliSGpJN3hRU1NRSXdMcHBWa0h6VnF2dWpSV3IKM1hTK1BpTndERWNq
591VmlEUVYvQnQrSWIxaDVlMHZJV2gwR093RnFoVlZOMVA1U1JvNTU2c05BWklRZjAzUVRHOQpOeDJh
592UGZsZHZtRGtINHZRSkRFMFJNMGNualptNHppdFpjMXJVVm1SV0VnNE5DbzBGOFNSaE9WdmQ5VG1m
593aHYwCm5hUGt0MVZTRXpycFQ1YlgxbHRCbk1mdjNRMGVFZGVKMzZpTXYxbWRDQmtFbHRrNVF2bjhX
594N2Jnd2tRSTgwN0sKQWNHOXJhODRSaEpta0tNaWpNd1B1VVQ3S3dJREFRQUJvMU13VVRBZEJnTlZI
595UTRFRmdRVVZhbUFkUjR1ZW8zQgpmV0RjUlMyUkQ3OEtlZXd3SHdZRFZSMGpCQmd3Rm9BVVZhbUFk
596UjR1ZW8zQmZXRGNSUzJSRDc4S2Vld3dEd1lEClZSMFRBUUgvQkFVd0F3RUIvekFOQmdrcWhraUc5
597dzBCQVFzRkFBT0NBUUVBRjE1TTNvL3RyUVdYeHdHamlxZjgKNXBUTEM0bHJwQkZaTFZDbStQdHd4
598aENlN1ZSd2dLMElBb01EMW0vSjNEYnVJSjVURXlTVElnR2N0WHVNbG5pWgpsY3IwekZOZVVhQ08w
599YkdhaExYUXpCWTRxSkhTTUNWNnhiNXNqUDlEdk9HYnFxbHVTbk51ZFJ5UWNIbkd4SE0rCk1adXpO
600WUNseklOMEtYbFJuSTZqRXUrcG9XZ0pEMGN1NFM2b1lwT2R3bElRYmtaNnIrUE1jQ3hpRmhRd3E2
601em4KcE1nQzB0WlpSM3pCOEpVcTJwRHlGVy9jVlFjWkp5YUhnQkkwWlJWWG5wbDFqYng2YlNIOCts
602cnMxVk1xZDlkcQozd1BMcjBheWI2VkpNa29WMjNWSXAzLzlYQVpTR3Z6Y0dadnM2VThSUTdFbUtx
603akJibWxudm1CTkpUMk9xbFFRCllRPT0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo="""
604
605certstring = certstring.replace('\n', '')
606
607def _load_certificate_chain(context) -> None:
608    with tempfile.NamedTemporaryFile(delete=False) as fp:
609        fp.write(base64.b64decode(certstring))
610        fp.close()
611        context.load_cert_chain(fp.name)
612
613
614server_up = threading.Event()
615sniff_up = threading.Event()
616def server():
617    context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
618    _load_certificate_chain(context)
619    context.check_hostname = False
620    context.verify_mode = ssl.CERT_NONE
621    buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4'
622    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
623    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
624    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
625    ssock = context.wrap_socket(sock)
626    try:
627        ssock.bind(('127.0.0.1', 3496))
628        ssock.listen(1)
629        server_up.set()
630        connection, address = ssock.accept()
631        sniff_up.wait(timeout=1)
632        connection.send(buffer)
633        connection.close()
634    finally:
635        ssock.close()
636
637
638server_thread = threading.Thread(target=server)
639server_thread.start()
640server_up.wait(timeout=1)
641context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
642context.check_hostname = False
643context.verify_mode = ssl.CERT_NONE
644sock = DoIPSocket(activate_routing=False, force_tls=True, context=context)
645
646pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
647server_thread.join(timeout=1)
648assert len(pkts) == 2
649
650= Test DoIPSslSocket6
651~ broken_windows
652
653server_up = threading.Event()
654sniff_up = threading.Event()
655def server():
656    context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
657    _load_certificate_chain(context)
658    context.check_hostname = False
659    context.verify_mode = ssl.CERT_NONE
660    buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4'
661    sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
662    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
663    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
664    ssock = context.wrap_socket(sock)
665    try:
666        ssock.bind(('::1', 3496))
667        ssock.listen(1)
668        server_up.set()
669        connection, address = ssock.accept()
670        sniff_up.wait(timeout=1)
671        connection.send(buffer)
672        connection.close()
673    finally:
674        ssock.close()
675
676
677server_thread = threading.Thread(target=server)
678server_thread.start()
679server_up.wait(timeout=1)
680context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
681context.check_hostname = False
682context.verify_mode = ssl.CERT_NONE
683sock = DoIPSocket(ip="::1", activate_routing=False, force_tls=True, context=context)
684
685pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
686server_thread.join(timeout=1)
687assert len(pkts) == 2
688
689= Test UDS_DoIPSslSocket6
690~ broken_windows
691
692server_up = threading.Event()
693sniff_up = threading.Event()
694def server():
695    context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
696    _load_certificate_chain(context)
697    context.check_hostname = False
698    context.verify_mode = ssl.CERT_NONE
699    buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4'
700    sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
701    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
702    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
703    ssock = context.wrap_socket(sock)
704    try:
705        ssock.bind(('::1', 3496))
706        ssock.listen(1)
707        server_up.set()
708        connection, address = ssock.accept()
709        sniff_up.wait(timeout=1)
710        connection.send(buffer)
711        connection.close()
712    finally:
713        ssock.close()
714
715
716server_thread = threading.Thread(target=server)
717server_thread.start()
718server_up.wait(timeout=1)
719context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
720context.check_hostname = False
721context.verify_mode = ssl.CERT_NONE
722sock = UDS_DoIPSocket(ip="::1", activate_routing=False, force_tls=True, context=context)
723
724pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
725server_thread.join(timeout=1)
726assert len(pkts) == 2
727
728= Test UDS_DualDoIPSslSocket6
729~ broken_windows not_pypy
730
731server_tcp_up = threading.Event()
732server_tls_up = threading.Event()
733sniff_up = threading.Event()
734def server_tls():
735    context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
736    _load_certificate_chain(context)
737    context.check_hostname = False
738    context.verify_mode = ssl.CERT_NONE
739    buffer = bytes.fromhex("02fd0006000000090e8011061000000000")
740    buffer += b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4'
741    sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
742    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
743    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
744    ssock = context.wrap_socket(sock)
745    try:
746        ssock.bind(('::1', 3496))
747        ssock.listen(1)
748        server_tls_up.set()
749        connection, address = ssock.accept()
750        sniff_up.wait(timeout=1)
751        connection.send(buffer)
752        connection.close()
753    finally:
754        ssock.close()
755
756def server_tcp():
757    buffer = bytes.fromhex("02fd0006000000090e8011060700000000")
758    sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
759    try:
760        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
761        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
762        sock.bind(('::1', 13400))
763        sock.listen(1)
764        server_tcp_up.set()
765        connection, address = sock.accept()
766        connection.send(buffer)
767        connection.shutdown(socket.SHUT_RDWR)
768        connection.close()
769    finally:
770        sock.close()
771
772
773server_tcp_thread = threading.Thread(target=server_tcp)
774server_tcp_thread.start()
775server_tcp_up.wait(timeout=1)
776server_tls_thread = threading.Thread(target=server_tls)
777server_tls_thread.start()
778server_tls_up.wait(timeout=1)
779context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
780context.check_hostname = False
781context.verify_mode = ssl.CERT_NONE
782
783
784sock = UDS_DoIPSocket(ip="::1", context=context)
785
786pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
787server_tcp_thread.join(timeout=1)
788server_tls_thread.join(timeout=1)
789assert len(pkts) == 2
790