• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1% Regression tests for ISOTPNativeSocket
2~ automotive_comm
3
4+ Configuration
5~ conf
6
7= Imports
8
9with open(scapy_path("test/contrib/automotive/interface_mockup.py")) as f:
10    exec(f.read())
11
12= Definition of constants, utility functions and mock classes
13
14# hexadecimal to bytes convenience function
15dhex = bytes.fromhex
16
17+ Compatibility with can-isotp linux kernel modules
18
19= Compatibility with isotpsend
20exit_if_no_isotp_module()
21
22message = "01 02 03 04 05 06 07 08 09 0A 0B 0C 0D 0E 0F 10 11 12 13 14"
23
24with new_can_socket0() as isocan, ISOTPSoftSocket(isocan, tx_id=0x642, rx_id=0x242) as s:
25    p = subprocess.Popen(["isotpsend", "-s", "242", "-d", "642", iface0], stdin=subprocess.PIPE, universal_newlines=True)
26    p.communicate(message)
27    r = p.returncode
28    assert r == 0
29    isotp = s.recv()
30    assert isotp.data == dhex(message)
31
32
33= Compatibility with isotpsend - extended addresses
34exit_if_no_isotp_module()
35message = "01 02 03 04 05 06 07 08 09 0A 0B 0C 0D 0E 0F 10 11 12 13 14"
36
37with new_can_socket0() as isocan, ISOTPSoftSocket(isocan, tx_id=0x644, rx_id=0x244, ext_address=0xaa, rx_ext_address=0xee) as s:
38    p = subprocess.Popen(["isotpsend", "-s", "244", "-d", "644", "-x", "ee:aa", iface0], stdin=subprocess.PIPE, universal_newlines=True)
39    p.communicate(message)
40    r = p.returncode
41    assert r == 0
42    isotp = s.recv()
43    assert isotp.data == dhex(message)
44
45
46= Compatibility with isotprecv
47exit_if_no_isotp_module()
48
49isotp = ISOTP(data=bytearray(range(1,20)))
50p = subprocess.Popen(["isotprecv", "-s", "243", "-d", "643", "-b", "3", iface0], stdout=subprocess.PIPE)
51time.sleep(0.1)
52with new_can_socket0() as isocan, ISOTPSoftSocket(isocan, tx_id=0x643, rx_id=0x243) as s:
53    s.send(isotp)
54
55timer = threading.Timer(1, lambda: p.terminate() if p.poll() else p.wait())
56timer.start()  # Timeout the receiver after 1 second
57r = p.wait()
58assert 0 == r
59
60result = None
61for i in range(10):
62    time.sleep(0.1)
63    if p.poll() is not None:
64        result = p.stdout.readline().decode().strip()
65        break
66
67assert result is not None
68result_data = dhex(result)
69assert result_data == isotp.data
70
71timer.join(5)
72assert not timer.is_alive()
73
74
75= Compatibility with isotprecv - extended addresses
76exit_if_no_isotp_module()
77isotp = ISOTP(data=bytearray(range(1,20)))
78cmd = ["isotprecv", "-s245", "-d645", "-b3", "-x", "ee:aa", iface0]
79p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
80time.sleep(0.1)  # Give some time for starting reception
81with new_can_socket0() as isocan, ISOTPSoftSocket(isocan, tx_id=0x645, rx_id=0x245, ext_address=0xaa, rx_ext_address=0xee) as s:
82    s.send(isotp)
83
84timer = threading.Timer(1, lambda: p.terminate() if p.poll() else p.wait())
85timer.start()  # Timeout the receiver after 1 second
86r = p.wait()
87assert 0 == r
88
89result = None
90for i in range(10):
91    time.sleep(0.1)
92    if p.poll() is not None:
93        result = p.stdout.readline().decode().strip()
94        break
95
96assert result is not None
97result_data = dhex(result)
98assert result_data == isotp.data
99
100timer.join(5)
101assert not timer.is_alive()
102
103= Compatibility ISOTPSoftSocket ISOTPNativeSocket various configs
104exit_if_no_isotp_module()
105
106message = "01 02 03 04 05 06 07 08 09 0A 0B 0C 0D 0E 0F 10 11 12 13 14" * 5
107
108kwargs = [({"tx_id": 0x242, "rx_id": 0x642, "bs": 0, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None},
109           {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None}),
110          ({"tx_id": 0x242, "rx_id": 0x642, "bs": 2, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None},
111           {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None}),
112          ({"tx_id": 0x242, "rx_id": 0x642, "bs": 5, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None},
113           {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None}),
114          ({"tx_id": 0x242, "rx_id": 0x642, "bs": 0, "stmin": 5, "padding": False, "ext_address": None, "rx_ext_address": None},
115           {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None}),
116          ({"tx_id": 0x242, "rx_id": 0x642, "bs": 0, "stmin": 10, "padding": False, "ext_address": None, "rx_ext_address": None},
117           {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None}),
118          ({"tx_id": 0x242, "rx_id": 0x642, "bs": 4, "stmin": 130, "padding": False, "ext_address": None, "rx_ext_address": None},
119           {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None}),
120          ({"tx_id": 0x242, "rx_id": 0x642, "bs": 3, "stmin": 0, "padding": True, "ext_address": None, "rx_ext_address": None},
121           {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": True, "ext_address": None, "rx_ext_address": None}),
122          ({"tx_id": 0x242, "rx_id": 0x642, "bs": 0, "stmin": 0, "padding": False, "ext_address": 0xfe, "rx_ext_address": None},
123           {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": False, "ext_address": 0xfe, "rx_ext_address": None}),
124          ({"tx_id": 0x242, "rx_id": 0x642, "bs": 0, "stmin": 0, "padding": False, "ext_address": 0xfe, "rx_ext_address": 0xef},
125           {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": False, "ext_address": 0xef, "rx_ext_address": 0xfe}),
126          ({"tx_id": 0x242, "rx_id": 0x642, "bs": 6, "stmin": 10, "padding": True, "ext_address": 0x12, "rx_ext_address": 0x23},
127           {"tx_id": 0x642, "rx_id": 0x242, "bs": 6, "stmin": 5, "padding": True, "ext_address": 0x23, "rx_ext_address": 0x12}),
128          ({"tx_id": 0x242, "rx_id": 0x642, "bs": 0, "stmin": 0, "padding": True, "ext_address": 0x45, "rx_ext_address": None},
129           {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 40, "padding": True, "ext_address": 0x45, "rx_ext_address": None}),
130          ({"tx_id": 0x123, "rx_id": 0x642, "bs": 1, "stmin": 1, "padding": False, "ext_address": None, "rx_ext_address": None},
131           {"tx_id": 0x642, "rx_id": 0x123, "bs": 1, "stmin": 1, "padding": False, "ext_address": None, "rx_ext_address": None}),]
132
133for kwargs1, kwargs2 in kwargs:
134    print("Testing config %s, %s" % (kwargs1, kwargs2))
135    with NativeCANSocket(iface0) as cs:
136        cs.sniff(timeout=0.01)
137        with ISOTPSoftSocket(iface0, **kwargs1) as s, ISOTPNativeSocket(iface0, **kwargs2) as ns:
138            ns.send(ISOTP(bytes.fromhex(message)))
139            isotp = s.recv()
140            assert isotp.data == dhex(message)
141            ns.send(ISOTP(bytes.fromhex("00 11 22")))
142            isotp = s.recv()
143            assert (isotp.data == dhex("00 11 22"))
144            pks1 = cs.sniff(timeout=0.01)
145        with ISOTPNativeSocket(iface0, **kwargs1) as s, ISOTPSoftSocket(iface0, **kwargs2) as ns:
146            ns.send(ISOTP(bytes.fromhex(message)))
147            isotp = s.recv()
148            assert isotp.data == dhex(message)
149            ns.send(ISOTP(bytes.fromhex("00 11 22")))
150            isotp = s.recv()
151            assert (isotp.data == dhex("00 11 22"))
152            pks2 = cs.sniff(timeout=0.01)
153        assert len(pks1) == len(pks2) and len(pks2) > 0
154        for p1, p2 in zip(pks1, pks2):
155            assert bytes(p1) == bytes(p2)
156
157
158+ ISOTPNativeSocket tests
159
160= Create ISOTP socket
161exit_if_no_isotp_module()
162s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241)
163
164= Send single frame ISOTP message
165exit_if_no_isotp_module()
166
167with new_can_socket(iface0) as cans:
168    s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241)
169    s.send(ISOTP(data=dhex("01 02 03 04 05")))
170    can = cans.sniff(timeout=1, count=1)[0]
171    assert can.identifier == 0x641
172    assert can.data == dhex("05 01 02 03 04 05")
173
174
175= Send single frame ISOTP message Test init with CANSocket
176exit_if_no_isotp_module()
177cans = CANSocket(iface0)
178s = ISOTPNativeSocket(cans, tx_id=0x641, rx_id=0x241)
179s.send(ISOTP(data=dhex("01 02 03 04 05")))
180can = cans.sniff(timeout=1, count=1)[0]
181assert can.identifier == 0x641
182assert can.data == dhex("05 01 02 03 04 05")
183cans.close()
184
185
186= Test init with wrong type
187exit_if_no_isotp_module()
188exception_catched = False
189try:
190    s = ISOTPNativeSocket(42, tx_id=0x641, rx_id=0x241)
191except Scapy_Exception:
192    exception_catched = True
193
194assert exception_catched
195
196= Send two-frame ISOTP message
197exit_if_no_isotp_module()
198
199evt = threading.Event()
200def acker():
201    with new_can_socket(iface0) as cans:
202        evt.set()
203        can = cans.sniff(timeout=1, count=1)[0]
204        cans.send(CAN(identifier = 0x241, data=dhex("30 00 00")))
205
206
207with new_can_socket(iface0) as cans:
208    t = Thread(target=acker)
209    t.start()
210    s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241)
211    evt.wait(timeout=5)
212    s.send(ISOTP(data=dhex("01 02 03 04 05 06 07 08")))
213    can = cans.sniff(timeout=1, count=1)[0]
214    assert can.identifier == 0x641
215    assert can.data == dhex("10 08 01 02 03 04 05 06")
216    can = cans.sniff(timeout=1, count=1)[0]
217    assert can.identifier == 0x241
218    assert can.data == dhex("30 00 00")
219    can = cans.sniff(timeout=1, count=1)[0]
220    assert can.identifier == 0x641
221    assert can.data == dhex("21 07 08")
222    t.join(timeout=5)
223    assert not t.is_alive()
224
225= Send a single frame ISOTP message with padding
226exit_if_no_isotp_module()
227s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, padding=True)
228
229with new_can_socket(iface0) as cans:
230    s.send(ISOTP(data=dhex("01")))
231    can = cans.sniff(timeout=1, count=1)[0]
232    assert can.length == 8
233
234
235= Send a two-frame ISOTP message with padding
236exit_if_no_isotp_module()
237
238acker_ready = threading.Event()
239def acker():
240    with new_can_socket(iface0) as acks:
241        acker_ready.set()
242        can = acks.sniff(timeout=1, count=1)[0]
243        acks.send(CAN(identifier = 0x241, data=dhex("30 00 00")))
244
245with new_can_socket(iface0) as cans:
246    thread = Thread(target=acker)
247    thread.start()
248    acker_ready.wait(timeout=5)
249    s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, padding=True)
250    s.send(ISOTP(data=dhex("01 02 03 04 05 06 07 08")))
251    can = cans.sniff(timeout=1, count=1)[0]
252    assert can.identifier == 0x641
253    assert can.data == dhex("10 08 01 02 03 04 05 06")
254    can = cans.sniff(timeout=1, count=1)[0]
255    assert can.identifier == 0x241
256    assert can.data == dhex("30 00 00")
257    can = cans.sniff(timeout=1, count=1)[0]
258    assert can.identifier == 0x641
259    assert can.data == dhex("21 07 08 CC CC CC CC CC")
260    thread.join(5)
261    assert not thread.is_alive()
262
263
264= Receive a padded single frame ISOTP message with padding disabled
265exit_if_no_isotp_module()
266s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, padding=False)
267with new_can_socket(iface0) as cans:
268    cans.send(CAN(identifier=0x241, data=dhex("02 05 06 00 00 00 00 00")))
269    res = s.recv()
270    assert res.data == dhex("05 06")
271
272
273= Receive a padded single frame ISOTP message with padding enabled
274exit_if_no_isotp_module()
275s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, padding=True)
276with new_can_socket(iface0) as cans:
277    cans.send(CAN(identifier=0x241, data=dhex("02 05 06 00 00 00 00 00")))
278    res = s.recv()
279    assert res.data == dhex("05 06")
280
281
282= Receive a non-padded single frame ISOTP message with padding enabled
283exit_if_no_isotp_module()
284s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, padding=True)
285with new_can_socket(iface0) as cans:
286    cans.send(CAN(identifier=0x241, data=dhex("02 05 06")))
287    res = s.recv()
288    assert res.data == dhex("05 06")
289
290
291= Receive a padded two-frame ISOTP message with padding enabled
292exit_if_no_isotp_module()
293s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, padding=True)
294with new_can_socket(iface0) as cans:
295    cans.send(CAN(identifier=0x241, data=dhex("10 09 01 02 03 04 05 06")))
296    cans.send(CAN(identifier=0x241, data=dhex("21 07 08 09 00 00 00 00")))
297    res = s.recv()
298    assert res.data == dhex("01 02 03 04 05 06 07 08 09")
299
300
301= Receive a padded two-frame ISOTP message with padding disabled
302exit_if_no_isotp_module()
303s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, padding=False)
304with new_can_socket(iface0) as cans:
305    cans.send(CAN(identifier=0x241, data=dhex("10 09 01 02 03 04 05 06")))
306    cans.send(CAN(identifier=0x241, data=dhex("21 07 08 09 00 00 00 00")))
307    res = s.recv()
308    assert res.data == dhex("01 02 03 04 05 06 07 08 09")
309
310
311= Receive a single frame ISOTP message
312exit_if_no_isotp_module()
313s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241)
314with new_can_socket(iface0) as cans:
315    cans.send(CAN(identifier = 0x241, data = dhex("05 01 02 03 04 05")))
316    isotp = s.recv()
317    assert isotp.data == dhex("01 02 03 04 05")
318    assert isotp.tx_id == 0x641
319    assert isotp.rx_id == 0x241
320    assert isotp.ext_address == None
321    assert isotp.rx_ext_address == None
322
323
324= Receive a single frame ISOTP message, with extended addressing
325exit_if_no_isotp_module()
326s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, ext_address=0xc0, rx_ext_address=0xea)
327with new_can_socket(iface0) as cans:
328    cans.send(CAN(identifier = 0x241, data = dhex("EA 05 01 02 03 04 05")))
329    isotp = s.recv()
330    assert isotp.data == dhex("01 02 03 04 05")
331    assert isotp.tx_id == 0x641
332    assert isotp.rx_id == 0x241
333    assert isotp.ext_address == 0xc0
334    assert isotp.rx_ext_address == 0xea
335
336
337= Receive a two-frame ISOTP message
338exit_if_no_isotp_module()
339s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241)
340with new_can_socket(iface0) as cans:
341    cans.send(CAN(identifier = 0x241, data = dhex("10 0B 01 02 03 04 05 06")))
342    cans.send(CAN(identifier = 0x241, data = dhex("21 07 08 09 10 11")))
343    isotp = s.recv()
344    assert isotp.data == dhex("01 02 03 04 05 06 07 08 09 10 11")
345
346= Receive a two-frame ISOTP message and test python with statement
347exit_if_no_isotp_module()
348with ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241) as s:
349    with new_can_socket(iface0) as cans:
350        cans.send(CAN(identifier = 0x241, data = dhex("10 0B 01 02 03 04 05 06")))
351        cans.send(CAN(identifier = 0x241, data = dhex("21 07 08 09 10 11")))
352    isotp = s.recv()
353    assert isotp.data == dhex("01 02 03 04 05 06 07 08 09 10 11")
354
355
356= Send single CANFD frame ISOTP message
357exit_if_no_isotp_module()
358
359with new_can_socket(iface0, fd=True) as cans:
360    s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, fd=True)
361    s.send(ISOTP(data=dhex("01 02 03 04 05 06 07 08 09")))
362    can = cans.sniff(timeout=1, count=1)[0]
363    assert can.identifier == 0x641
364    assert can.data == dhex("09 01 02 03 04 05 06 07 08 09")
365
366
367= ISOTP Socket sr1 test
368exit_if_no_isotp_module()
369
370txSock = ISOTPNativeSocket(iface0, tx_id=0x123, rx_id=0x321, basecls=ISOTP)
371txmsg = ISOTP(b'\x11\x22\x33')
372rx2 = None
373
374receiver_up = Event()
375
376def sender():
377    global receiver_up
378    receiver_up.wait(timeout=5)
379    global txmsg
380    global rx2
381    rx2 = txSock.sr1(txmsg, timeout=1, verbose=True)
382
383def receiver():
384    global receiver_up
385    with new_can_socket(iface0) as cans:
386        rx = cans.sniff(timeout=1, count=1, started_callback=receiver_up.set)[0]
387        cans.send(CAN(identifier=0x321, length=4, data=b'\x03\x7f\x22\x33'))
388    expectedrx = CAN(identifier=0x123, length=4, data=b'\x03\x11\x22\x33')
389    assert rx.length == expectedrx.length
390    assert rx.data == expectedrx.data
391    assert rx.identifier == expectedrx.identifier
392
393txThread = threading.Thread(target=sender)
394txThread.start()
395receiver()
396txThread.join(timeout=5)
397assert not txThread.is_alive()
398
399assert rx2 is not None
400assert rx2 == ISOTP(b'\x7f\x22\x33')
401assert rx2.answers(txmsg)
402
403= ISOTP Socket sr1 and ISOTP test
404exit_if_no_isotp_module()
405txSock = ISOTPNativeSocket(iface0, 0x123, 0x321, basecls=ISOTP)
406rxSock = ISOTPNativeSocket(iface0, 0x321, 0x123, basecls=ISOTP)
407msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33')
408rx2 = None
409
410receiver_up = Event()
411
412def sender():
413    receiver_up.wait(timeout=5)
414    global rx2
415    rx2 = txSock.sr1(msg, timeout=1, verbose=True)
416
417def receiver():
418    global rx
419    receiver_up.set()
420    rx = rxSock.recv()
421    rxSock.send(msg)
422
423txThread = threading.Thread(target=sender)
424txThread.start()
425receiver()
426txThread.join(timeout=5)
427assert not txThread.is_alive()
428
429assert rx == msg
430assert rxSock.send(msg)
431assert rx2 is not None
432assert rx2 == msg
433
434= ISOTP Socket sr1 and ISOTP test vice versa
435exit_if_no_isotp_module()
436
437rxSock = ISOTPNativeSocket(iface0, 0x321, 0x123, basecls=ISOTP)
438txSock = ISOTPNativeSocket(iface0, 0x123, 0x321, basecls=ISOTP)
439
440msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33')
441
442receiver_up = Event()
443
444def receiver():
445    global rx2, sent
446    rx2 = rxSock.sniff(count=1, timeout=1, started_callback=receiver_up.set)
447    sent = rxSock.send(msg)
448
449def sender():
450    global rx
451    receiver_up.wait(timeout=5)
452    rx = txSock.sr1(msg, timeout=1,verbose=True)
453
454rx2 = None
455sent = False
456rxThread = threading.Thread(target=receiver)
457rxThread.start()
458sender()
459rxThread.join(timeout=5)
460assert not rxThread.is_alive()
461
462assert rx == msg
463assert rx2[0] == msg
464assert sent
465
466= ISOTP Socket sniff
467exit_if_no_isotp_module()
468
469rxSock = ISOTPNativeSocket(iface0, 0x321, 0x123, basecls=ISOTP)
470txSock = ISOTPNativeSocket(iface0, 0x123, 0x321, basecls=ISOTP)
471succ = False
472
473receiver_up = Event()
474
475def receiver():
476    rx = rxSock.sniff(count=5, timeout=1, started_callback=receiver_up.set)
477    msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33')
478    msg.data += b'0'
479    assert rx[0] == msg
480    msg.data += b'1'
481    assert rx[1] == msg
482    msg.data += b'2'
483    assert rx[2] == msg
484    msg.data += b'3'
485    assert rx[3] == msg
486    msg.data += b'4'
487    assert rx[4] == msg
488    global succ
489    succ = True
490
491def sender():
492    receiver_up.wait(timeout=5)
493    msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33')
494    msg.data += b'0'
495    assert txSock.send(msg)
496    msg.data += b'1'
497    assert txSock.send(msg)
498    msg.data += b'2'
499    assert txSock.send(msg)
500    msg.data += b'3'
501    assert txSock.send(msg)
502    msg.data += b'4'
503    assert txSock.send(msg)
504
505rxThread = threading.Thread(target=receiver)
506rxThread.start()
507sender()
508rxThread.join(timeout=5)
509assert not rxThread.is_alive()
510
511assert succ
512
513+ ISOTPNativeSocket MITM attack tests
514~ vcan_socket needs_root linux
515
516= bridge and sniff with isotp native sockets set up vcan0 and vcan1 for package forwarding vcan1
517exit_if_no_isotp_module()
518
519isoTpSocket0 = ISOTPNativeSocket(iface0, tx_id=0x241, rx_id=0x641)
520isoTpSocket1 = ISOTPNativeSocket(iface1, tx_id=0x641, rx_id=0x241)
521bSocket0 = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241)
522bSocket1 = ISOTPNativeSocket(iface1, tx_id=0x241, rx_id=0x641)
523
524bridgeStarted = threading.Event()
525def bridge():
526    global bridgeStarted
527    def forwarding(pkt):
528        return pkt
529    bridge_and_sniff(if1=bSocket0, if2=bSocket1, xfrm12=forwarding, xfrm21=forwarding, timeout=2, count=1, started_callback=bridgeStarted.set)
530    bSocket0.close()
531    bSocket1.close()
532    global bSucc
533    bSucc = True
534
535bSucc = False
536
537threadBridge = threading.Thread(target=bridge)
538threadBridge.start()
539bridgeStarted.wait(timeout=5)
540isoTpSocket0.send(ISOTP(b'Request'))
541packetsVCan1 = isoTpSocket1.sniff(timeout=0.5, count=1)
542
543assert len(packetsVCan1) == 1
544
545isoTpSocket0.close()
546isoTpSocket1.close()
547
548threadBridge.join(timeout=5)
549assert not threadBridge.is_alive()
550
551assert bSucc
552
553= bridge and sniff with isotp native sockets set up vcan0 and vcan1 for package change to vcan1
554exit_if_no_isotp_module()
555
556isoTpSocket0 = ISOTPNativeSocket(iface0, tx_id=0x241, rx_id=0x641)
557isoTpSocket1 = ISOTPNativeSocket(iface1, tx_id=0x641, rx_id=0x241)
558bSocket0 = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241)
559bSocket1 = ISOTPNativeSocket(iface1, tx_id=0x241, rx_id=0x641)
560
561bSucc = False
562
563bridgeStarted = threading.Event()
564def bridge():
565    global bridgeStarted
566    global bSucc
567    def forwarding(pkt):
568        pkt.data = 'changed'
569        return pkt
570    bridge_and_sniff(if1=bSocket0, if2=bSocket1, xfrm12=forwarding, xfrm21=forwarding, timeout=0.5, started_callback=bridgeStarted.set)
571    bSocket0.close()
572    bSocket1.close()
573    bSucc = True
574
575threadBridge = threading.Thread(target=bridge)
576threadBridge.start()
577bridgeStarted.wait(timeout=5)
578isoTpSocket0.send(ISOTP(b'Request'))
579packetsVCan1 = isoTpSocket1.sniff(timeout=0.5, count=1)
580
581packetsVCan1[0].data = b'changed'
582assert len(packetsVCan1) == 1
583
584isoTpSocket0.close()
585isoTpSocket1.close()
586
587threadBridge.join(timeout=5)
588assert not threadBridge.is_alive()
589
590assert bSucc
591
592= bridge and sniff with isotp native sockets set up vcan0 and vcan1 for package forwarding in both directions
593exit_if_no_isotp_module()
594
595bSucc = False
596
597isoTpSocket0 = ISOTPNativeSocket(iface0, tx_id=0x241, rx_id=0x641)
598isoTpSocket1 = ISOTPNativeSocket(iface1, tx_id=0x641, rx_id=0x241)
599bSocket0 = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241)
600bSocket1 = ISOTPNativeSocket(iface1, tx_id=0x241, rx_id=0x641)
601
602bridgeStarted = threading.Event()
603def bridge():
604    global bridgeStarted
605    global bSucc
606    def forwarding(pkt):
607        return pkt
608    bridge_and_sniff(if1=bSocket0, if2=bSocket1, xfrm12=forwarding, xfrm21=forwarding, timeout=0.5, started_callback=bridgeStarted.set, count=2)
609    bSocket0.close()
610    bSocket1.close()
611    bSucc = True
612
613threadBridge = threading.Thread(target=bridge)
614threadBridge.start()
615bridgeStarted.wait(timeout=5)
616
617packetVcan0 = ISOTP(b'RequestVcan0')
618packetVcan1 = ISOTP(b'RequestVcan1')
619isoTpSocket0.send(packetVcan0)
620isoTpSocket1.send(packetVcan1)
621
622packetsVCan0 = isoTpSocket0.sniff(timeout=0.5, count=1)
623packetsVCan1 = isoTpSocket1.sniff(timeout=0.5, count=1)
624
625len(packetsVCan0) == 1
626len(packetsVCan1) == 1
627
628isoTpSocket0.close()
629isoTpSocket1.close()
630
631threadBridge.join(timeout=5)
632assert not threadBridge.is_alive()
633
634assert bSucc
635
636= bridge and sniff with isotp native sockets set up vcan0 and vcan1 for package change in both directions
637exit_if_no_isotp_module()
638
639bSucc = False
640
641isoTpSocket0 = ISOTPNativeSocket(iface0, tx_id=0x241, rx_id=0x641)
642isoTpSocket1 = ISOTPNativeSocket(iface1, tx_id=0x641, rx_id=0x241)
643bSocket0 = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241)
644bSocket1 = ISOTPNativeSocket(iface1, tx_id=0x241, rx_id=0x641)
645
646bridgeStarted = threading.Event()
647def bridge():
648    global bridgeStarted
649    global bSucc
650    def forwarding(pkt):
651        pkt.data = 'changed'
652        return pkt
653    bridge_and_sniff(if1=bSocket0, if2=bSocket1, xfrm12=forwarding, xfrm21=forwarding, timeout=0.5, started_callback=bridgeStarted.set, count=2)
654    bSocket0.close()
655    bSocket1.close()
656    bSucc = True
657
658threadBridge = threading.Thread(target=bridge)
659threadBridge.start()
660bridgeStarted.wait(timeout=5)
661
662packetVcan0 = ISOTP(b'RequestVcan0')
663packetVcan1 = ISOTP(b'RequestVcan1')
664isoTpSocket0.send(packetVcan0)
665isoTpSocket1.send(packetVcan1)
666
667packetsVCan0 = isoTpSocket0.sniff(timeout=0.5, count=1)
668packetsVCan1 = isoTpSocket1.sniff(timeout=0.5, count=1)
669
670packetsVCan0[0].data = b'changed'
671assert len(packetsVCan0) == 1
672packetsVCan1[0].data = b'changed'
673assert len(packetsVCan1) == 1
674
675isoTpSocket0.close()
676isoTpSocket1.close()
677
678threadBridge.join(timeout=5)
679assert not threadBridge.is_alive()
680
681assert bSucc
682
683+ Cleanup
684
685= Cleanup reference to ISOTPSoftSocket to let the thread end
686s = None
687
688= Delete vcan interfaces
689
690assert cleanup_interfaces()
691