• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1% Regression tests for ISOTPSoftSocket
2~ automotive_comm
3
4+ Configuration
5~ conf
6
7= Imports
8import time
9from io import BytesIO
10from scapy.layers.can import *
11from scapy.contrib.isotp import *
12from scapy.contrib.isotp.isotp_soft_socket import TimeoutScheduler
13from test.testsocket import TestSocket, cleanup_testsockets
14with open(scapy_path("test/contrib/automotive/interface_mockup.py")) as f:
15    exec(f.read())
16
17= Redirect logging
18import logging
19from scapy.error import log_runtime
20
21from io import StringIO
22
23log_stream = StringIO()
24handler = logging.StreamHandler(log_stream)
25log_runtime.addHandler(handler)
26log_isotp.addHandler(handler)
27
28= Definition of utility functions
29
30# hexadecimal to bytes convenience function
31dhex = bytes.fromhex
32
33
34+ Test sniffer
35= Test sniffer with multiple frames
36
37test_frames = [
38    (0x241, "EA 10 28 01 02 03 04 05"),
39    (0x641, "EA 30 03 00"            ),
40    (0x241, "EA 21 06 07 08 09 0A 0B"),
41    (0x241, "EA 22 0C 0D 0E 0F 10 11"),
42    (0x241, "EA 23 12 13 14 15 16 17"),
43    (0x641, "EA 30 03 00"            ),
44    (0x241, "EA 24 18 19 1A 1B 1C 1D"),
45    (0x241, "EA 25 1E 1F 20 21 22 23"),
46    (0x241, "EA 26 24 25 26 27 28"   ),
47]
48
49with TestSocket(CAN) as s, TestSocket(CAN) as tx_sock:
50    s.pair(tx_sock)
51    for f in test_frames:
52        tx_sock.send(CAN(identifier=f[0], data=dhex(f[1])))
53    sniffed = sniff(opened_socket=s, session=ISOTPSession, timeout=1, count=1)
54
55assert sniffed[0]['ISOTP'].data == bytearray(range(1, 0x29))
56assert sniffed[0]['ISOTP'].tx_id == 0x641
57assert sniffed[0]['ISOTP'].ext_address == 0xEA
58assert sniffed[0]['ISOTP'].rx_id == 0x241
59assert sniffed[0]['ISOTP'].rx_ext_address == 0xEA
60
61+ ISOTPSoftSocket tests
62
63= CAN socket FD
64~ not_pypy needs_root linux vcan_socket
65
66with ISOTPSoftSocket(iface0, tx_id=0x641, rx_id=0x241, fd=True) as s:
67    assert s.impl.can_socket.fd == True
68
69= CAN socket non-FD
70~ not_pypy needs_root linux vcan_socket
71
72with ISOTPSoftSocket(iface0, tx_id=0x641, rx_id=0x241) as s:
73    assert s.impl.can_socket.fd == False
74
75= Single-frame receive
76
77with TestSocket(CAN) as cans, TestSocket(CAN) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241) as s:
78    cans.pair(stim)
79    stim.send(CAN(identifier=0x241, data=dhex("05 01 02 03 04 05")))
80    pkts = s.sniff(count=1, timeout=1)
81    if not len(pkts):
82        s.failure_analysis()
83        raise Scapy_Exception("ERROR")
84    msg = pkts[0]
85    assert msg.data == dhex("01 02 03 04 05")
86
87= Single-frame receive FD
88
89with TestSocket(CANFD) as cans, TestSocket(CANFD) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241, fd=True) as s:
90    pl_sizes_testings = [1, 5, 7, 8, 15, 20, 35, 40, 46, 62]
91    data_str = ""
92    data_str_offset = 0
93    cans.pair(stim)
94    for size_to_send in pl_sizes_testings:
95        if size_to_send > 7:
96            data_str = "00 {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(size_to_send)]))
97            data_str_offset = 6
98        else:
99            data_str = "{} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(size_to_send)]))
100            data_str_offset = 2
101        stim.send(CANFD(identifier=0x241, data=dhex(data_str)))
102        pkts = s.sniff(count=1, timeout=1)
103        if not len(pkts):
104            s.failure_analysis()
105            raise Scapy_Exception("ERROR")
106        msg = pkts[0]
107        assert msg.data == dhex(data_str[data_str_offset:])
108
109= Single-frame send
110
111with TestSocket(CAN) as cans, TestSocket(CAN) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241) as s:
112    cans.pair(stim)
113    s.send(ISOTP(dhex("01 02 03 04 05")))
114    pkts = stim.sniff(count=1, timeout=1)
115    if not len(pkts):
116        s.failure_analysis()
117        raise Scapy_Exception("ERROR")
118    msg = pkts[0]
119    assert msg.data == dhex("05 01 02 03 04 05")
120
121= Single-frame send FD
122
123with TestSocket(CANFD) as cans, TestSocket(CANFD) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241, fd=True) as s:
124    pl_sizes_testings = [1, 5, 7, 8, 15, 20, 35, 40, 46, 62]
125    data_str = ""
126    data_str_offset = 0
127    cans.pair(stim)
128    for size_to_send in pl_sizes_testings:
129        if size_to_send > 7:
130            data_str = "00 {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(size_to_send)]))
131            data_str_offset = 6
132        else:
133            data_str = "{} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(size_to_send)]))
134            data_str_offset = 2
135        s.send(ISOTP(dhex(data_str[data_str_offset:])))
136        pkts = stim.sniff(count=1, timeout=1)
137        if not len(pkts):
138            s.failure_analysis()
139            raise Scapy_Exception("ERROR")
140        msg = pkts[0]
141        assert msg.data == dhex(data_str)
142
143= Two frame receive
144
145with TestSocket(CAN) as cans, TestSocket(CAN) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241) as s:
146    cans.pair(stim)
147    stim.send(CAN(identifier=0x241, data=dhex("10 09 01 02 03 04 05 06")))
148    pkts = stim.sniff(count=1, timeout=1)
149    if not len(pkts):
150        s.failure_analysis()
151        raise Scapy_Exception("ERROR")
152    c = pkts[0]
153    assert (c.data == dhex("30 00 00"))
154    stim.send(CAN(identifier=0x241, data=dhex("21 07 08 09 00 00 00 00")))
155    pkts = s.sniff(count=1, timeout=1)
156    if not len(pkts):
157        s.failure_analysis()
158        raise Scapy_Exception("ERROR")
159    msg = pkts[0]
160    assert msg.data == dhex("01 02 03 04 05 06 07 08 09")
161
162
163= Two frame receive FD
164
165with TestSocket(CANFD) as cans, TestSocket(CANFD) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241, fd=True) as s:
166    cans.pair(stim)
167    stim.send(CANFD(identifier=0x241, data=dhex("10 09 01 02 03 04 05 06 07 08 09 0A 0B")))
168    pkts = stim.sniff(count=1, timeout=1)
169    if not len(pkts):
170        s.failure_analysis()
171        raise Scapy_Exception("ERROR")
172    c = pkts[0]
173    assert (c.data == dhex("30 00 00"))
174    stim.send(CANFD(identifier=0x241, data=dhex("21 07 08 09 00 00 00 00")))
175    pkts = s.sniff(count=1, timeout=1)
176    if not len(pkts):
177        s.failure_analysis()
178        raise Scapy_Exception("ERROR")
179    msg = pkts[0]
180    assert msg.data == dhex("01 02 03 04 05 06 07 08 09")
181
182
183= 20000 bytes receive
184
185def test():
186    with TestSocket(CAN) as cans, TestSocket(CAN) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241) as s:
187        cans.pair(stim)
188        data = dhex("01 02 03 04 05") * 4000
189        cf = ISOTP(data, rx_id=0x241).fragment()
190        ff = cf.pop(0)
191        cs = stim.sniff(count=1, timeout=3, started_callback=lambda: stim.send(ff))
192        assert len(cs)
193        c = cs[0]
194        assert (c.data == dhex("30 00 00"))
195        for f in cf:
196            _ = stim.send(f)
197        msgs = s.sniff(count=1, timeout=30)
198        print(msgs)
199        msg = msgs[0]
200        assert msg.data == data
201
202test()
203
204= 20000 bytes send
205
206def test():
207    with TestSocket(CAN) as cans, TestSocket(CAN) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241) as s:
208        cans.pair(stim)
209        data = dhex("01 02 03 04 05")*4000
210        msg = ISOTP(data, rx_id=0x641)
211        fragments = msg.fragment()
212        ack = CAN(identifier=0x241, data=dhex("30 00 00"))
213        ff = stim.sniff(timeout=1, count=1,
214                        started_callback=lambda:s.send(msg))
215        assert len(ff) == 1
216        cfs = stim.sniff(timeout=20, count=len(fragments) - 1,
217                         started_callback=lambda: stim.send(ack))
218        for fragment, cf in zip(fragments, ff + cfs):
219            assert (bytes(fragment) == bytes(cf))
220
221test()
222
223= 20000 bytes send FD
224
225def testfd():
226    with TestSocket(CANFD) as cans, TestSocket(CANFD) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241, fd=True) as s:
227        cans.pair(stim)
228        data = dhex("01 02 03 04 05")*4006
229        msg = ISOTP(data, rx_id=0x641)
230        fragments = msg.fragment(fd=True)
231        ack = CANFD(identifier=0x241, data=dhex("30 00 00"))
232        ff = stim.sniff(timeout=1, count=1,
233                        started_callback=lambda:s.send(msg))
234        assert len(ff) == 1
235        cfs = stim.sniff(timeout=20, count=len(fragments) - 1,
236                         started_callback=lambda: stim.send(ack))
237        for fragment, cf in zip(fragments, ff + cfs):
238            assert (bytes(fragment) == bytes(cf))
239
240testfd()
241
242= Close ISOTPSoftSocket
243
244with TestSocket(CAN) as cans, TestSocket(CAN) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241) as s:
245    cans.pair(stim)
246    s.close()
247    s = None
248
249= Test on_recv function with single frame
250with ISOTPSoftSocket(TestSocket(CAN), tx_id=0x641, rx_id=0x241) as s:
251    s.ins.on_recv(CAN(identifier=0x241, data=dhex("05 01 02 03 04 05")))
252    msg, ts = s.ins.rx_queue.recv()
253    assert msg == dhex("01 02 03 04 05")
254
255= Test on_recv function with single frame FD
256with ISOTPSoftSocket(TestSocket(CANFD), tx_id=0x641, rx_id=0x241, fd=True) as s:
257    pl_sizes_testings = [1, 5, 7, 8, 15, 20, 35, 40, 46, 62]
258    data_str = ""
259    data_str_offset = 0
260    for size_to_send in pl_sizes_testings:
261        if size_to_send > 7:
262            data_str = "00 {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(size_to_send)]))
263            data_str_offset = 6
264        else:
265            data_str = "{} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(size_to_send)]))
266            data_str_offset = 2
267        s.ins.on_recv(CANFD(identifier=0x241, data=dhex(data_str)))
268        msg, ts = s.ins.rx_queue.recv()
269        assert msg == dhex(data_str[data_str_offset:])
270
271= Test on_recv function with empty frame
272with ISOTPSoftSocket(TestSocket(CAN), tx_id=0x641, rx_id=0x241) as s:
273    s.ins.on_recv(CAN(identifier=0x241, data=b""))
274    assert s.ins.rx_queue.empty()
275
276= Test on_recv function with single frame and extended addressing
277with ISOTPSoftSocket(TestSocket(CAN), tx_id=0x641, rx_id=0x241, rx_ext_address=0xea) as s:
278    cf = CAN(identifier=0x241, data=dhex("EA 05 01 02 03 04 05"))
279    s.ins.on_recv(cf)
280    msg, ts = s.ins.rx_queue.recv()
281    assert msg == dhex("01 02 03 04 05")
282    assert ts == cf.time
283
284
285= Test on_recv function with single frame and extended addressing FD
286with ISOTPSoftSocket(TestSocket(CANFD), tx_id=0x641, rx_id=0x241, rx_ext_address=0xea, fd=True) as s:
287    pl_sizes_testings = [1, 5, 7, 8, 15, 20, 35, 40, 46, 62]
288    data_str = ""
289    data_str_offset = 0
290    for size_to_send in pl_sizes_testings:
291        if size_to_send > 7:
292            data_str = "EA 00 {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(size_to_send)]))
293            data_str_offset = 8
294        else:
295            data_str = "EA {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(size_to_send)]))
296            data_str_offset = 5
297        cf = CANFD(identifier=0x241, data=dhex(data_str))
298        s.ins.on_recv(cf)
299        msg, ts = s.ins.rx_queue.recv()
300        assert msg == dhex(data_str[data_str_offset:])
301        assert ts == cf.time
302
303= CF is sent when first frame is received
304cans = TestSocket(CAN)
305can_out = TestSocket(CAN)
306cans.pair(can_out)
307with ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241) as s:
308    s.ins.on_recv(CAN(identifier=0x241, data=dhex("10 20 01 02 03 04 05 06")))
309    can = can_out.sniff(timeout=1, count=1)[0]
310    assert can.identifier == 0x641
311    assert can.data == dhex("30 00 00")
312
313cans.close()
314can_out.close()
315
316+ Testing ISOTPSoftSocket with an actual CAN socket
317
318= Verify that packets are not lost if they arrive before the sniff() is called
319with TestSocket(CAN) as ss, TestSocket(CAN) as sr:
320    ss.pair(sr)
321    tx_func = lambda: ss.send(CAN(identifier=0x111, data=b"\x01\x23\x45\x67"))
322    p = sr.sniff(count=1, timeout=0.2, started_callback=tx_func)
323    assert len(p)==1
324    tx_func = lambda: ss.send(CAN(identifier=0x111, data=b"\x89\xab\xcd\xef"))
325    p = sr.sniff(count=1, timeout=0.2, started_callback=tx_func)
326    assert len(p)==1
327
328= Send single frame ISOTP message, using send
329with TestSocket(CAN) as isocan, \
330        ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, \
331        TestSocket(CAN) as cans:
332    cans.pair(isocan)
333    can = cans.sniff(timeout=2, count=1, started_callback=lambda: s.send(ISOTP(data=dhex("01 02 03 04 05"))))
334    assert can[0].identifier == 0x641
335    assert can[0].data == dhex("05 01 02 03 04 05")
336
337= Send many single frame ISOTP messages, using send
338
339with TestSocket(CAN) as isocan, \
340        ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, \
341        TestSocket(CAN) as cans:
342    cans.pair(isocan)
343    for i in range(100):
344        data = dhex("01 02 03 04 05") + struct.pack("B", i)
345        expected = struct.pack("B", len(data)) + data
346        can = cans.sniff(timeout=4, count=1, started_callback=lambda: s.send(ISOTP(data=data)))
347        assert can[0].identifier == 0x641
348        print(can[0].data, data)
349        assert can[0].data == expected
350
351
352= Send two-frame ISOTP message, using send
353with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans:
354    cans.pair(isocan)
355    can = cans.sniff(timeout=1, count=1, started_callback=lambda: s.send(ISOTP(data=dhex("01 02 03 04 05 06 07 08"))))
356    assert can[0].identifier == 0x641
357    assert can[0].data == dhex("10 08 01 02 03 04 05 06")
358    can = cans.sniff(timeout=1, count=1, started_callback=lambda: cans.send(CAN(identifier = 0x241, data=dhex("30 00 00"))))
359    assert can[0].identifier == 0x641
360    assert can[0].data == dhex("21 07 08")
361
362= Send two-frame ISOTP message, using send FD
363with TestSocket(CANFD) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, fd=True) as s, TestSocket(CANFD) as cans:
364    size_to_send = 100
365    max_pl_size = 62
366    data_str = "{}".format(" ".join(["%02X" % x for x in range(size_to_send)]))
367    cans.pair(isocan)
368    can = cans.sniff(timeout=1, count=1, started_callback=lambda: s.send(dhex(data_str)))
369    assert can[0].identifier == 0x641
370    assert can[0].data == dhex("10 {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(max_pl_size)])))
371    can = cans.sniff(timeout=1, count=1, started_callback=lambda: cans.send(CANFD(identifier = 0x241, data=dhex("30 00 00"))))
372    assert can[0].identifier == 0x641
373    assert can[0].data == dhex("21 {}".format(" ".join(["%02X" % x for x in range(max_pl_size, size_to_send)])))
374
375= Send single frame ISOTP message
376with TestSocket(CAN) as cans, TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s:
377    cans.pair(isocan)
378    s.send(ISOTP(data=dhex("01 02 03 04 05")))
379    can = cans.sniff(timeout=1, count=1)
380    assert can[0].identifier == 0x641
381    assert can[0].data == dhex("05 01 02 03 04 05")
382
383
384= Send two-frame ISOTP message
385
386acks = TestSocket(CAN)
387
388acker_ready = threading.Event()
389def acker():
390    acker_ready.set()
391    can_pkt = acks.sniff(timeout=1, count=1)
392    can = can_pkt[0]
393    acks.send(CAN(identifier = 0x241, data=dhex("30 00 00")))
394
395thread = Thread(target=acker)
396thread.start()
397acker_ready.wait(timeout=5)
398with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans:
399    cans.pair(isocan)
400    cans.pair(acks)
401    isocan.pair(acks)
402    s.send(ISOTP(data=dhex("01 02 03 04 05 06 07 08")))
403    pkts = cans.sniff(timeout=1, count=1)
404    if not len(pkts):
405        s.failure_analysis()
406        raise Scapy_Exception("ERROR")
407    can = pkts[0]
408    assert can.identifier == 0x641
409    assert can.data == dhex("10 08 01 02 03 04 05 06")
410    pkts = cans.sniff(timeout=1, count=1)
411    if not len(pkts):
412        s.failure_analysis()
413        raise Scapy_Exception("ERROR")
414    can = pkts[0]
415    assert can.identifier == 0x241
416    assert can.data == dhex("30 00 00")
417    pkts = cans.sniff(timeout=1, count=1)
418    if not len(pkts):
419        s.failure_analysis()
420        raise Scapy_Exception("ERROR")
421    can = pkts[0]
422    assert can.identifier == 0x641
423    assert can.data == dhex("21 07 08")
424
425thread.join(15)
426acks.close()
427assert not thread.is_alive()
428
429= Send two-frame ISOTP message FD
430
431acks = TestSocket(CANFD)
432
433acker_ready = threading.Event()
434def acker():
435    acker_ready.set()
436    can_pkt = acks.sniff(timeout=1, count=1)
437    can = can_pkt[0]
438    acks.send(CANFD(identifier = 0x241, data=dhex("30 00 00")))
439
440thread = Thread(target=acker)
441thread.start()
442acker_ready.wait(timeout=5)
443with TestSocket(CANFD) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, fd=True) as s, TestSocket(CANFD) as cans:
444    size_to_send = 123
445    max_pl_size = 62
446    data_str = "{}".format(" ".join(["%02X" % x for x in range(size_to_send)]))
447    cans.pair(isocan)
448    cans.pair(acks)
449    isocan.pair(acks)
450    s.send(dhex(data_str))
451    pkts = cans.sniff(timeout=1, count=1)
452    if not len(pkts):
453        s.failure_analysis()
454        raise Scapy_Exception("ERROR")
455    can = pkts[0]
456    assert can.identifier == 0x641
457    assert can.data == dhex("10 {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(max_pl_size)])))
458    pkts = cans.sniff(timeout=1, count=1)
459    if not len(pkts):
460        s.failure_analysis()
461        raise Scapy_Exception("ERROR")
462    can = pkts[0]
463    assert can.identifier == 0x241
464    assert can.data == dhex("30 00 00")
465    pkts = cans.sniff(timeout=1, count=1)
466    if not len(pkts):
467        s.failure_analysis()
468        raise Scapy_Exception("ERROR")
469    can = pkts[0]
470    assert can.identifier == 0x641
471    assert can.data == dhex("21 {}".format(" ".join(["%02X" % x for x in range(max_pl_size, size_to_send)])))
472
473thread.join(15)
474acks.close()
475assert not thread.is_alive()
476
477= Send two-frame ISOTP message with bs
478
479acks = TestSocket(CAN)
480acker_ready = threading.Event()
481def acker():
482    acker_ready.set()
483    can_pkt = acks.sniff(timeout=1, count=1)
484    acks.send(CAN(identifier = 0x241, data=dhex("30 20 00")))
485
486thread = Thread(target=acker)
487thread.start()
488acker_ready.wait(timeout=5)
489with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans:
490    cans.pair(isocan)
491    cans.pair(acks)
492    isocan.pair(acks)
493    s.send(ISOTP(data=dhex("01 02 03 04 05 06 07 08")))
494    pkts = cans.sniff(timeout=1, count=1)
495    if not len(pkts):
496        s.failure_analysis()
497        raise Scapy_Exception("ERROR")
498    can = pkts[0]
499    assert can.identifier == 0x641
500    assert can.data == dhex("10 08 01 02 03 04 05 06")
501    pkts = cans.sniff(timeout=1, count=1)
502    if not len(pkts):
503        s.failure_analysis()
504        raise Scapy_Exception("ERROR")
505    can = pkts[0]
506    assert can.identifier == 0x241
507    assert can.data == dhex("30 20 00")
508    pkts = cans.sniff(timeout=1, count=1)
509    if not len(pkts):
510        s.failure_analysis()
511        raise Scapy_Exception("ERROR")
512    can = pkts[0]
513    assert can.identifier == 0x641
514    assert can.data == dhex("21 07 08")
515
516thread.join(15)
517acks.close()
518assert not thread.is_alive()
519
520= Send two-frame ISOTP message with bs FD
521
522acks = TestSocket(CANFD)
523acker_ready = threading.Event()
524def acker():
525    acker_ready.set()
526    can_pkt = acks.sniff(timeout=1, count=1)
527    acks.send(CANFD(identifier = 0x241, data=dhex("30 20 00")))
528
529thread = Thread(target=acker)
530thread.start()
531acker_ready.wait(timeout=5)
532with TestSocket(CANFD) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, fd=True) as s, TestSocket(CANFD) as cans:
533    size_to_send = 124
534    max_pl_size = 62
535    data_str = "{}".format(" ".join(["%02X" % x for x in range(size_to_send)]))
536    cans.pair(isocan)
537    cans.pair(acks)
538    isocan.pair(acks)
539    s.send(ISOTP(data=dhex(data_str)))
540    pkts = cans.sniff(timeout=1, count=1)
541    if not len(pkts):
542        s.failure_analysis()
543        raise Scapy_Exception("ERROR")
544    can = pkts[0]
545    assert can.identifier == 0x641
546    assert can.data == dhex("10 {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(max_pl_size)])))
547    pkts = cans.sniff(timeout=1, count=1)
548    if not len(pkts):
549        s.failure_analysis()
550        raise Scapy_Exception("ERROR")
551    can = pkts[0]
552    assert can.identifier == 0x241
553    assert can.data == dhex("30 20 00")
554    pkts = cans.sniff(timeout=1, count=1)
555    if not len(pkts):
556        s.failure_analysis()
557        raise Scapy_Exception("ERROR")
558    can = pkts[0]
559    assert can.identifier == 0x641
560    assert can.data == dhex("21 {}".format(" ".join(["%02X" % x for x in range(max_pl_size, size_to_send)])))
561
562thread.join(15)
563acks.close()
564assert not thread.is_alive()
565
566= Send two-frame ISOTP message with ST
567acks = TestSocket(CAN)
568acker_ready = threading.Event()
569def acker():
570    acker_ready.set()
571    acks.sniff(timeout=1, count=1)
572    acks.send(CAN(identifier = 0x241, data=dhex("30 00 10")))
573
574thread = Thread(target=acker)
575thread.start()
576acker_ready.wait(timeout=5)
577with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans:
578    cans.pair(isocan)
579    cans.pair(acks)
580    isocan.pair(acks)
581    s.send(ISOTP(data=dhex("01 02 03 04 05 06 07 08")))
582    pkts = cans.sniff(timeout=1, count=1)
583    if not len(pkts):
584        s.failure_analysis()
585        raise Scapy_Exception("ERROR")
586    can = pkts[0]
587    assert can.identifier == 0x641
588    assert can.data == dhex("10 08 01 02 03 04 05 06")
589    pkts = cans.sniff(timeout=1, count=1)
590    if not len(pkts):
591        s.failure_analysis()
592        raise Scapy_Exception("ERROR")
593    can = pkts[0]
594    assert can.identifier == 0x241
595    assert can.data == dhex("30 00 10")
596    pkts = cans.sniff(timeout=1, count=1)
597    if not len(pkts):
598        s.failure_analysis()
599        raise Scapy_Exception("ERROR")
600    can = pkts[0]
601    assert can.identifier == 0x641
602    assert can.data == dhex("21 07 08")
603
604thread.join(15)
605acks.close()
606assert not thread.is_alive()
607
608= Send two-frame ISOTP message with ST FD
609acks = TestSocket(CANFD)
610acker_ready = threading.Event()
611def acker():
612    acker_ready.set()
613    acks.sniff(timeout=1, count=1)
614    acks.send(CANFD(identifier = 0x241, data=dhex("30 00 10")))
615
616thread = Thread(target=acker)
617thread.start()
618acker_ready.wait(timeout=5)
619with TestSocket(CANFD) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, fd=True) as s, TestSocket(CANFD) as cans:
620    size_to_send = 124
621    max_pl_size = 62
622    data_str = "{}".format(" ".join(["%02X" % x for x in range(size_to_send)]))
623    cans.pair(isocan)
624    cans.pair(acks)
625    isocan.pair(acks)
626    s.send(dhex(data_str))
627    pkts = cans.sniff(timeout=1, count=1)
628    if not len(pkts):
629        s.failure_analysis()
630        raise Scapy_Exception("ERROR")
631    can = pkts[0]
632    assert can.identifier == 0x641
633    assert can.data == dhex("10 {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(max_pl_size)])))
634    pkts = cans.sniff(timeout=1, count=1)
635    if not len(pkts):
636        s.failure_analysis()
637        raise Scapy_Exception("ERROR")
638    can = pkts[0]
639    assert can.identifier == 0x241
640    assert can.data == dhex("30 00 10")
641    pkts = cans.sniff(timeout=1, count=1)
642    if not len(pkts):
643        s.failure_analysis()
644        raise Scapy_Exception("ERROR")
645    can = pkts[0]
646    assert can.identifier == 0x641
647    assert can.data == dhex("21 {}".format(" ".join(["%02X" % x for x in range(max_pl_size, size_to_send)])))
648
649thread.join(15)
650acks.close()
651assert not thread.is_alive()
652
653= Receive a single frame ISOTP message
654
655with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans:
656    cans.pair(isocan)
657    cans.send(CAN(identifier = 0x241, data = dhex("05 01 02 03 04 05")))
658    pkts = s.sniff(count=1, timeout=2)
659    if not len(pkts):
660        s.failure_analysis()
661        raise Scapy_Exception("ERROR")
662    isotp = pkts[0]
663    assert isotp.data == dhex("01 02 03 04 05")
664    assert isotp.tx_id == 0x641
665    assert isotp.rx_id == 0x241
666    assert isotp.ext_address == None
667    assert isotp.rx_ext_address == None
668
669
670= Receive a single frame ISOTP message, with extended addressing
671
672with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, ext_address=0xc0, rx_ext_address=0xea) as s, TestSocket(CAN) as cans:
673    cans.pair(isocan)
674    cans.send(CAN(identifier = 0x241, data = dhex("EA 05 01 02 03 04 05")))
675    pkts = s.sniff(count=1, timeout=2)
676    if not len(pkts):
677        s.failure_analysis()
678        raise Scapy_Exception("ERROR")
679    isotp = pkts[0]
680    assert isotp.data == dhex("01 02 03 04 05")
681    assert isotp.tx_id == 0x641
682    assert isotp.rx_id == 0x241
683    assert isotp.ext_address == 0xc0
684    assert isotp.rx_ext_address == 0xea
685
686
687= Receive frames from CandumpReader
688candump_fd = BytesIO(b'''  vcan0  541   [8]  10 0A DE AD BE EF AA AA
689  vcan0  241   [3]  30 00 00
690  vcan0  541   [5]  21 AA AA AA AA
691  vcan0  541   [8]  10 0A DE AD BE EF AA AA
692  vcan0  541   [8]  10 0A DE AD BE EF AA AA
693  vcan0  241   [3]  30 00 00
694  vcan0  541   [5]  21 AA AA AA AA
695  vcan0  541   [8]  10 0A DE AD BE EF AA AA
696  vcan0  241   [3]  30 00 00
697  vcan0  541   [5]  21 AA AA AA AA
698  vcan0  541   [8]  10 0A DE AD BE EF AA AA
699  vcan0  241   [3]  30 00 00
700  vcan0  541   [5]  21 AA AA AA AA
701  vcan0  541   [8]  10 0A DE AD BE EF AA AA
702  vcan0  241   [3]  30 00 00
703  vcan0  541   [5]  21 AA AA AA AA
704  vcan0  541   [8]  10 0A DE AD BE EF AA AA
705  vcan0  241   [3]  30 00 00
706  vcan0  541   [5]  21 AA AA AA AA''')
707
708with ISOTPSoftSocket(CandumpReader(candump_fd), tx_id=0x241, rx_id=0x541, listen_only=True) as s:
709    pkts = s.sniff(timeout=2, count=6)
710    assert len(pkts) == 6
711    if not len(pkts):
712        s.failure_analysis()
713        raise Scapy_Exception("ERROR")
714    isotp = pkts[0]
715    print(repr(isotp))
716    print(hex(isotp.tx_id))
717    print(hex(isotp.rx_id))
718    assert isotp.data == dhex("DE AD BE EF AA AA AA AA AA AA")
719    assert isotp.tx_id == 0x241
720    assert isotp.rx_id == 0x541
721
722= Receive frames from CandumpReader with ISOTPSniffer without extended addressing
723candump_fd = BytesIO(b'''  vcan0  541   [8]  10 0A DE AD BE EF AA AA
724  vcan0  241   [3]  30 00 00
725  vcan0  541   [5]  21 AA AA AA AA
726  vcan0  541   [8]  10 0A DE AD BE EF AA AA
727  vcan0  541   [8]  10 0A DE AD BE EF AA AA
728  vcan0  241   [3]  30 00 00
729  vcan0  541   [5]  21 AA AA AA AA
730  vcan0  541   [8]  10 0A DE AD BE EF AA AA
731  vcan0  241   [3]  30 00 00
732  vcan0  541   [5]  21 AA AA AA AA
733  vcan0  541   [8]  10 0A DE AD BE EF AA AA
734  vcan0  241   [3]  30 00 00
735  vcan0  541   [5]  21 AA AA AA AA
736  vcan0  541   [8]  10 0A DE AD BE EF AA AA
737  vcan0  241   [3]  30 00 00
738  vcan0  541   [5]  21 AA AA AA AA
739  vcan0  541   [8]  10 0A DE AD BE EF AA AA
740  vcan0  241   [3]  30 00 00
741  vcan0  541   [5]  21 AA AA AA AA''')
742
743pkts = sniff(opened_socket=CandumpReader(candump_fd), session=ISOTPSession(use_ext_address=False), timeout=1)
744assert len(pkts) == 6
745
746if not len(pkts):
747    s.failure_analysis()
748    raise Scapy_Exception("ERROR")
749
750isotp = pkts[0]
751assert isotp.data == dhex("DE AD BE EF AA AA AA AA AA AA")
752assert (isotp.rx_id == 0x541)
753
754= Receive frames from CandumpReader with ISOTPSniffer
755* all flow control frames are detected as single frame with extended address
756
757candump_fd = BytesIO(b'''  vcan0  541   [8]  10 0A DE AD BE EF AA AA
758  vcan0  241   [3]  30 00 00
759  vcan0  541   [5]  21 AA AA AA AA
760  vcan0  541   [8]  10 0A DE AD BE EF AA AA
761  vcan0  541   [8]  10 0A DE AD BE EF AA AA
762  vcan0  241   [3]  30 00 00
763  vcan0  541   [5]  21 AA AA AA AA
764  vcan0  541   [8]  10 0A DE AD BE EF AA AA
765  vcan0  241   [3]  30 00 00
766  vcan0  541   [5]  21 AA AA AA AA
767  vcan0  541   [8]  10 0A DE AD BE EF AA AA
768  vcan0  241   [3]  30 00 00
769  vcan0  541   [5]  21 AA AA AA AA
770  vcan0  541   [8]  10 0A DE AD BE EF AA AA
771  vcan0  241   [3]  30 00 00
772  vcan0  541   [5]  21 AA AA AA AA
773  vcan0  541   [8]  10 0A DE AD BE EF AA AA
774  vcan0  241   [3]  30 00 00
775  vcan0  541   [5]  21 AA AA AA AA''')
776
777pkts = sniff(opened_socket=CandumpReader(candump_fd), session=ISOTPSession, timeout=1)
778if not len(pkts):
779    s.failure_analysis()
780    raise Scapy_Exception("ERROR")
781
782assert len(pkts) == 12
783isotp = pkts[1]
784assert isotp.data == dhex("DE AD BE EF AA AA AA AA AA AA")
785assert (isotp.rx_id == 0x541)
786isotp = pkts[0]
787assert isotp.data == dhex("")
788assert (isotp.rx_id == 0x241)
789
790= Receive frames from CandumpReader with ISOTPSniffer and count
791* all flow control frames are detected as single frame with extended address
792
793candump_fd = BytesIO(b'''  vcan0  541   [8]  10 0A DE AD BE EF AA AA
794  vcan0  241   [3]  30 00 00
795  vcan0  541   [5]  21 AA AA AA AA
796  vcan0  541   [8]  10 0A DE AD BE EF AA AA
797  vcan0  541   [8]  10 0A DE AD BE EF AA AA
798  vcan0  241   [3]  30 00 00
799  vcan0  541   [5]  21 AA AA AA AA
800  vcan0  541   [8]  10 0A DE AD BE EF AA AA
801  vcan0  241   [3]  30 00 00
802  vcan0  541   [5]  21 AA AA AA AA
803  vcan0  541   [8]  10 0A DE AD BE EF AA AA
804  vcan0  241   [3]  30 00 00
805  vcan0  541   [5]  21 AA AA AA AA
806  vcan0  541   [8]  10 0A DE AD BE EF AA AA
807  vcan0  241   [3]  30 00 00
808  vcan0  541   [5]  21 AA AA AA AA
809  vcan0  541   [8]  10 0A DE AD BE EF AA AA
810  vcan0  241   [3]  30 00 00
811  vcan0  541   [5]  21 AA AA AA AA''')
812
813pkts = sniff(opened_socket=CandumpReader(candump_fd), session=ISOTPSession, timeout=1, count=2)
814if not len(pkts):
815    s.failure_analysis()
816    raise Scapy_Exception("ERROR")
817
818assert len(pkts) == 2
819isotp = pkts[1]
820assert isotp.data == dhex("DE AD BE EF AA AA AA AA AA AA")
821assert (isotp.rx_id == 0x541)
822isotp = pkts[0]
823assert isotp.data == dhex("")
824assert (isotp.rx_id == 0x241)
825
826= Receive a two-frame ISOTP message
827
828with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans:
829    cans.pair(isocan)
830    cans.send(CAN(identifier = 0x241, data = dhex("10 0B 01 02 03 04 05 06")))
831    cans.send(CAN(identifier = 0x241, data = dhex("21 07 08 09 10 11")))
832    pkts = s.sniff(count=1, timeout=1)
833    if not len(pkts):
834        s.failure_analysis()
835        raise Scapy_Exception("ERROR")
836    isotp = pkts[0]
837    assert isotp.data == dhex("01 02 03 04 05 06 07 08 09 10 11")
838
839= Check what happens when a CAN frame with wrong identifier gets received
840
841with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans:
842    cans.pair(isocan)
843    cans.send(CAN(identifier = 0x141, data = dhex("05 01 02 03 04 05")))
844    assert s.ins.rx_queue.empty()
845
846+ Testing ISOTPSoftSocket timeouts
847
848= Check if not sending the last CF will make the socket timeout
849with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans:
850    cans.pair(isocan)
851    cans.send(CAN(identifier = 0x241, data = dhex("10 11 01 02 03 04 05 06")))
852    cans.send(CAN(identifier = 0x241, data = dhex("21 07 08 09 0A 0B 0C 0D")))
853    isotp = s.sniff(timeout=0.1)
854
855assert len(isotp) == 0
856
857= Check if not sending the first CF will make the socket timeout
858
859with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans:
860    cans.pair(isocan)
861    cans.send(CAN(identifier = 0x241, data = dhex("10 11 01 02 03 04 05 06")))
862    isotp = s.sniff(timeout=0.1)
863
864assert len(isotp) == 0
865
866= Check if not sending the first FC will make the socket timeout
867
868# drain log_stream
869log_stream.getvalue()
870
871isotp = ISOTP(data=dhex("01 02 03 04 05 06 07 08 09 0A"))
872
873with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans:
874    cans.pair(isocan)
875    s.send(isotp)
876    time.sleep(1.3)
877
878assert "TX state was reset due to timeout" in log_stream.getvalue()
879
880= Check if not sending the second FC will make the socket timeout
881
882# drain log_stream
883log_stream.getvalue()
884
885isotp = ISOTP(data=b"\xa5" * 120)
886cans = TestSocket(CAN)
887isocan = TestSocket(CAN)
888cans.pair(isocan)
889
890acker = AsyncSniffer(store=False, opened_socket=cans,
891                     prn=lambda x: cans.send(CAN(identifier = 0x241, data=dhex("30 04 00"))),
892                     count=1, timeout=1)
893acker.start()
894with ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s:
895    s.send(isotp)
896    time.sleep(1.3)
897
898acker.join(timeout=5)
899cans.close()
900isocan.close()
901
902assert "TX state was reset due to timeout" in log_stream.getvalue()
903
904= Check if reception of an overflow FC will make a send fail
905
906log_stream.getvalue()
907isotp = ISOTP(data=b"\xa5" * 120)
908cans = TestSocket(CAN)
909isocan = TestSocket(CAN)
910cans.pair(isocan)
911
912acker = AsyncSniffer(store=False, opened_socket=cans,
913                     prn=lambda x: cans.send(
914                         CAN(identifier = 0x241, data=dhex("32 00 00"))),
915                     count=1, timeout=1)
916acker.start()
917
918with ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s:
919    s.send(isotp)
920    time.sleep(1.3)
921
922acker.join(timeout=5)
923cans.close()
924isocan.close()
925
926assert "Overflow happened at the receiver side" in log_stream.getvalue()
927
928+ More complex operations
929
930= ISOTPSoftSocket sr1
931msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33')
932
933with TestSocket(CAN) as isocan_tx, ISOTPSoftSocket(isocan_tx, 0x123, 0x321) as sock_tx, \
934    TestSocket(CAN) as isocan_rx, ISOTPSoftSocket(isocan_rx, 0x321, 0x123) as sock_rx:
935    isocan_rx.pair(isocan_tx)
936    sniffer = AsyncSniffer(opened_socket=sock_rx, timeout=1, count=1, prn=lambda x: sock_rx.send(msg))
937    sniffer.start()
938    rx2 = sock_tx.sr1(msg, timeout=3, verbose=True)
939    sniffer.join(timeout=1)
940    rx = sniffer.results[0]
941
942assert rx == msg
943assert rx2 is not None
944assert rx2 == msg
945
946= ISOTPSoftSocket sniff
947
948msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33')
949with TestSocket(CAN) as isocan1, ISOTPSoftSocket(isocan1, 0x123, 0x321) as sock, \
950        TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, 0x321, 0x123) as rx_sock:
951    isocan1.pair(isocan)
952    msg.data += b'0'
953    sock.send(msg)
954    msg.data += b'1'
955    sock.send(msg)
956    msg.data += b'2'
957    sock.send(msg)
958    msg.data += b'3'
959    sock.send(msg)
960    msg.data += b'4'
961    sock.send(msg)
962    rx = rx_sock.sniff(count=5, timeout=5)
963
964msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33')
965msg.data += b'0'
966assert rx[0] == msg
967msg.data += b'1'
968assert rx[1] == msg
969msg.data += b'2'
970assert rx[2] == msg
971msg.data += b'3'
972assert rx[3] == msg
973msg.data += b'4'
974assert rx[4] == msg
975
976+ ISOTPSoftSocket MITM attack tests
977
978= bridge and sniff with isotp soft sockets set up vcan0 and vcan1 for package forwarding vcan1
979succ = False
980
981with TestSocket(CAN) as can0_0, \
982        TestSocket(CAN) as can0_1, \
983        TestSocket(CAN) as can1_0, \
984        TestSocket(CAN) as can1_1, \
985        ISOTPSoftSocket(can0_0, tx_id=0x241, rx_id=0x641) as isoTpSocket0, \
986        ISOTPSoftSocket(can1_0, tx_id=0x541, rx_id=0x141) as isoTpSocket1, \
987        ISOTPSoftSocket(can0_1, tx_id=0x641, rx_id=0x241) as bSocket0, \
988        ISOTPSoftSocket(can1_1, tx_id=0x141, rx_id=0x141) as bSocket1:
989    can0_0.pair(can0_1)
990    can1_1.pair(can1_0)
991    evt = threading.Event()
992    def forwarding(pkt):
993        global forwarded
994        forwarded += 1
995        return pkt
996    def bridge():
997        global forwarded, succ
998        forwarded = 0
999        bridge_and_sniff(if1=bSocket0, if2=bSocket1, xfrm12=forwarding, xfrm21=forwarding, timeout=1.5,
1000                         started_callback=evt.set, count=1)
1001        succ = True
1002    threadBridge = threading.Thread(target=bridge)
1003    threadBridge.start()
1004    evt.wait(timeout=5)
1005    packetsVCan1 = isoTpSocket1.sniff(timeout=1.5, count=1, started_callback=lambda: isoTpSocket0.send(ISOTP(b'Request')))
1006    threadBridge.join(timeout=5)
1007    assert not threadBridge.is_alive()
1008
1009assert forwarded == 1
1010assert len(packetsVCan1) == 1
1011assert succ
1012
1013= bridge and sniff with isotp soft sockets and multiple long packets
1014
1015N = 3
1016T = 3
1017
1018succ = False
1019with TestSocket(CAN) as can0_0, \
1020        TestSocket(CAN) as can0_1, \
1021        TestSocket(CAN) as can1_0, \
1022        TestSocket(CAN) as can1_1, \
1023        ISOTPSoftSocket(can0_0, tx_id=0x241, rx_id=0x641) as isoTpSocket0, \
1024        ISOTPSoftSocket(can1_0, tx_id=0x541, rx_id=0x141) as isoTpSocket1, \
1025        ISOTPSoftSocket(can0_1, tx_id=0x641, rx_id=0x241) as bSocket0, \
1026        ISOTPSoftSocket(can1_1, tx_id=0x141, rx_id=0x541) as bSocket1:
1027    can0_0.pair(can0_1)
1028    can1_1.pair(can1_0)
1029    evt = threading.Event()
1030    def forwarding(pkt):
1031        global forwarded
1032        forwarded += 1
1033        return pkt
1034    def bridge():
1035        global forwarded, succ
1036        forwarded = 0
1037        bridge_and_sniff(if1=bSocket0, if2=bSocket1, xfrm12=forwarding, xfrm21=forwarding,
1038                         timeout=T, count=N, started_callback=evt.set)
1039        succ = True
1040    threadBridge = threading.Thread(target=bridge)
1041    threadBridge.start()
1042    evt.wait(timeout=5)
1043    for _ in range(N):
1044        isoTpSocket0.send(ISOTP(b'RequestASDF1234567890'))
1045    packetsVCan1 = isoTpSocket1.sniff(timeout=T, count=N)
1046    threadBridge.join(timeout=5)
1047
1048assert not threadBridge.is_alive()
1049
1050assert forwarded == N
1051assert len(packetsVCan1) == N
1052assert succ
1053
1054= bridge and sniff with isotp soft sockets set up vcan0 and vcan1 for package change vcan1
1055
1056succ = False
1057with TestSocket(CAN) as can0_0, \
1058        TestSocket(CAN) as can0_1, \
1059        TestSocket(CAN) as can1_0, \
1060        TestSocket(CAN) as can1_1, \
1061        ISOTPSoftSocket(can0_0, tx_id=0x241, rx_id=0x641) as isoTpSocket0, \
1062        ISOTPSoftSocket(can1_0, tx_id=0x641, rx_id=0x241) as isoTpSocket1, \
1063        ISOTPSoftSocket(can0_1, tx_id=0x641, rx_id=0x241) as bSocket0, \
1064        ISOTPSoftSocket(can1_1, tx_id=0x241, rx_id=0x641) as bSocket1:
1065    can0_0.pair(can0_1)
1066    can1_1.pair(can1_0)
1067    evt = threading.Event()
1068    def forwarding(pkt):
1069        pkt.data = 'changed'
1070        return pkt
1071    def bridge():
1072        global succ
1073        bridge_and_sniff(if1=bSocket0, if2=bSocket1, xfrm12=forwarding, xfrm21=forwarding, timeout=3,
1074                         started_callback=evt.set, count=1)
1075        succ = True
1076    threadBridge = threading.Thread(target=bridge)
1077    threadBridge.start()
1078    evt.wait(timeout=5)
1079    packetsVCan1 = isoTpSocket1.sniff(timeout=2, count=1, started_callback=lambda: isoTpSocket0.send(ISOTP(b'Request')))
1080    threadBridge.join(timeout=5)
1081    assert not threadBridge.is_alive()
1082
1083assert len(packetsVCan1) == 1
1084assert packetsVCan1[0].data == b'changed'
1085assert succ
1086
1087= Two ISOTPSoftSockets at the same time, sending and receiving
1088
1089with TestSocket(CAN) as cs1, ISOTPSoftSocket(cs1, tx_id=0x641, rx_id=0x241) as s1, \
1090        TestSocket(CAN) as cs2, ISOTPSoftSocket(cs2, tx_id=0x241, rx_id=0x641) as s2:
1091    cs1.pair(cs2)
1092    isotp = ISOTP(data=b"\x10\x25" * 43)
1093    s2.send(isotp)
1094    result = s1.sniff(count=1, timeout=5)
1095
1096assert len(result) == 1
1097assert result[0].data == isotp.data
1098
1099
1100= Two ISOTPSoftSockets at the same time, sending and receiving with tx_gap
1101
1102with TestSocket(CAN) as cs1, ISOTPSoftSocket(cs1, tx_id=0x641, rx_id=0x241, stmin=1) as s1, \
1103        TestSocket(CAN) as cs2, ISOTPSoftSocket(cs2, tx_id=0x241, rx_id=0x641) as s2:
1104    cs1.pair(cs2)
1105    isotp = ISOTP(data=b"\x10\x25" * 43)
1106    s2.send(isotp)
1107    result = s1.sniff(count=1, timeout=5)
1108
1109assert len(result) == 1
1110assert result[0].data == isotp.data
1111
1112
1113= Two ISOTPSoftSockets at the same time, multiple sends/receives
1114def test():
1115    with TestSocket(CAN) as cs1, ISOTPSoftSocket(cs1, tx_id=0x641, rx_id=0x241) as s1, \
1116            TestSocket(CAN) as cs2, ISOTPSoftSocket(cs2, tx_id=0x241, rx_id=0x641) as s2:
1117        cs1.pair(cs2)
1118        for i in range(1, 40, 5):
1119            isotp = ISOTP(data=bytearray(range(i, i * 2)))
1120            s2.send(isotp)
1121        result = s1.sniff(count=8, timeout=5)
1122    print(result)
1123    for p in result:
1124        print(repr(p))
1125    assert len(result) == 8
1126
1127test()
1128
1129= Send a single frame ISOTP message with padding
1130
1131with TestSocket(CAN) as cs1, ISOTPSoftSocket(cs1, tx_id=0x641, rx_id=0x241, padding=True) as s:
1132    with TestSocket(CAN) as cans:
1133        cs1.pair(cans)
1134        s.send(ISOTP(data=dhex("01")))
1135        pkts = cans.sniff(timeout=1, count=1)
1136        if not len(pkts):
1137            s.failure_analysis()
1138            raise Scapy_Exception("ERROR")
1139        res = pkts[0]
1140        assert res.length == 8
1141
1142= Send a single frame ISOTP message with padding FD
1143
1144with TestSocket(CANFD) as cs1, ISOTPSoftSocket(cs1, tx_id=0x641, rx_id=0x241, padding=True, fd=True) as s:
1145    with TestSocket(CANFD) as cans:
1146        cs1.pair(cans)
1147        pl_sizes_testings = [1, 5, 7, 8, 9, 12, 15, 17, 20, 21, 27, 35, 40, 46, 50, 62]
1148        pl_sizes_expected = [8, 8, 8, 12, 12, 16, 20, 20, 24, 24, 32, 48, 48, 48, 64, 64]
1149        for i, pl_size in enumerate(pl_sizes_testings):
1150            s.send(dhex(" ".join(["%02X" % x for x in range(pl_size)])))
1151            pkts = cans.sniff(timeout=1, count=1)
1152            if not len(pkts):
1153                s.failure_analysis()
1154                raise Scapy_Exception("ERROR")
1155            res = pkts[0]
1156            assert res.length == pl_sizes_expected[i]
1157
1158
1159= Send a two-frame ISOTP message with padding
1160
1161acks = TestSocket(CAN)
1162cans = TestSocket(CAN)
1163acks.pair(cans)
1164
1165def send_ack(x):
1166    acks.send(CAN(identifier = 0x241, data=dhex("30 00 00")))
1167
1168acker = AsyncSniffer(opened_socket=acks, store=False, prn=send_ack, timeout=1, count=1)
1169acker.start()
1170
1171with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, padding=True) as s:
1172    acks.pair(isocan)
1173    cans.pair(isocan)
1174    s.send(ISOTP(data=dhex("01 02 03 04 05 06 07 08")))
1175    canpks = cans.sniff(timeout=1, count=3)
1176
1177acker.join(timeout=5)
1178canpks.sort(key=lambda x:x.identifier)
1179assert canpks[1].identifier == 0x641
1180assert canpks[1].data == dhex("10 08 01 02 03 04 05 06")
1181assert canpks[0].identifier == 0x241
1182assert canpks[0].data == dhex("30 00 00")
1183assert canpks[2].identifier == 0x641
1184assert canpks[2].data == dhex("21 07 08 CC CC CC CC CC")
1185
1186
1187= Receive a padded single frame ISOTP message with padding disabled
1188with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, padding=False) as s:
1189    with TestSocket(CAN) as cans:
1190        cans.pair(isocan)
1191        cans.send(CAN(identifier=0x241, data=dhex("02 05 06 00 00 00 00 00")))
1192        pkts = s.sniff(count=1, timeout=1)
1193        if not len(pkts):
1194            s.failure_analysis()
1195            raise Scapy_Exception("ERROR")
1196        res = pkts[0]
1197        assert res.data == dhex("05 06")
1198
1199
1200= Receive a padded single frame ISOTP message with padding enabled
1201with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, padding=True) as s:
1202    with TestSocket(CAN) as cans:
1203        cans.pair(isocan)
1204        cans.send(CAN(identifier=0x241, data=dhex("02 05 06 00 00 00 00 00")))
1205        pkts = s.sniff(count=1, timeout=1)
1206        if not len(pkts):
1207            s.failure_analysis()
1208            raise Scapy_Exception("ERROR")
1209        res = pkts[0]
1210        assert res.data == dhex("05 06")
1211
1212
1213= Receive a non-padded single frame ISOTP message with padding enabled
1214with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, padding=True) as s:
1215    with TestSocket(CAN) as cans:
1216        cans.pair(isocan)
1217        cans.send(CAN(identifier=0x241, data=dhex("02 05 06")))
1218        pkts = s.sniff(count=1, timeout=2)
1219        if not len(pkts):
1220            s.failure_analysis()
1221            raise Scapy_Exception("ERROR")
1222        res = pkts[0]
1223        assert res.data == dhex("05 06")
1224
1225
1226= Receive a padded two-frame ISOTP message with padding enabled
1227with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, padding=True) as s:
1228    with TestSocket(CAN) as cans:
1229        cans.pair(isocan)
1230        cans.send(CAN(identifier=0x241, data=dhex("10 09 01 02 03 04 05 06")))
1231        cans.send(CAN(identifier=0x241, data=dhex("21 07 08 09 00 00 00 00")))
1232        pkts = s.sniff(count=1, timeout=1)
1233        if not len(pkts):
1234            s.failure_analysis()
1235            raise Scapy_Exception("ERROR")
1236        res = pkts[0]
1237        assert res.data == dhex("01 02 03 04 05 06 07 08 09")
1238
1239= Receive a padded two-frame ISOTP message with padding disabled
1240with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, padding=False) as s:
1241    with TestSocket(CAN) as cans:
1242        cans.pair(isocan)
1243        cans.send(CAN(identifier=0x241, data=dhex("10 09 01 02 03 04 05 06")))
1244        cans.send(CAN(identifier=0x241, data=dhex("21 07 08 09 00 00 00 00")))
1245        pkts = s.sniff(count=1, timeout=1)
1246        if not len(pkts):
1247            s.failure_analysis()
1248            raise Scapy_Exception("ERROR")
1249        res = pkts[0]
1250        res.show()
1251        assert res.data == dhex("01 02 03 04 05 06 07 08 09")
1252
1253+ Cleanup
1254
1255= Delete testsockets
1256
1257cleanup_testsockets()
1258
1259TimeoutScheduler.clear()
1260
1261log_runtime.removeHandler(handler)
1262