1% Regression tests for ISOTPNativeSocket 2~ automotive_comm 3 4+ Configuration 5~ conf 6 7= Imports 8 9with open(scapy_path("test/contrib/automotive/interface_mockup.py")) as f: 10 exec(f.read()) 11 12= Definition of constants, utility functions and mock classes 13 14# hexadecimal to bytes convenience function 15dhex = bytes.fromhex 16 17+ Compatibility with can-isotp linux kernel modules 18 19= Compatibility with isotpsend 20exit_if_no_isotp_module() 21 22message = "01 02 03 04 05 06 07 08 09 0A 0B 0C 0D 0E 0F 10 11 12 13 14" 23 24with new_can_socket0() as isocan, ISOTPSoftSocket(isocan, tx_id=0x642, rx_id=0x242) as s: 25 p = subprocess.Popen(["isotpsend", "-s", "242", "-d", "642", iface0], stdin=subprocess.PIPE, universal_newlines=True) 26 p.communicate(message) 27 r = p.returncode 28 assert r == 0 29 isotp = s.recv() 30 assert isotp.data == dhex(message) 31 32 33= Compatibility with isotpsend - extended addresses 34exit_if_no_isotp_module() 35message = "01 02 03 04 05 06 07 08 09 0A 0B 0C 0D 0E 0F 10 11 12 13 14" 36 37with new_can_socket0() as isocan, ISOTPSoftSocket(isocan, tx_id=0x644, rx_id=0x244, ext_address=0xaa, rx_ext_address=0xee) as s: 38 p = subprocess.Popen(["isotpsend", "-s", "244", "-d", "644", "-x", "ee:aa", iface0], stdin=subprocess.PIPE, universal_newlines=True) 39 p.communicate(message) 40 r = p.returncode 41 assert r == 0 42 isotp = s.recv() 43 assert isotp.data == dhex(message) 44 45 46= Compatibility with isotprecv 47exit_if_no_isotp_module() 48 49isotp = ISOTP(data=bytearray(range(1,20))) 50p = subprocess.Popen(["isotprecv", "-s", "243", "-d", "643", "-b", "3", iface0], stdout=subprocess.PIPE) 51time.sleep(0.1) 52with new_can_socket0() as isocan, ISOTPSoftSocket(isocan, tx_id=0x643, rx_id=0x243) as s: 53 s.send(isotp) 54 55timer = threading.Timer(1, lambda: p.terminate() if p.poll() else p.wait()) 56timer.start() # Timeout the receiver after 1 second 57r = p.wait() 58assert 0 == r 59 60result = None 61for i in range(10): 62 time.sleep(0.1) 63 if p.poll() is not None: 64 result = p.stdout.readline().decode().strip() 65 break 66 67assert result is not None 68result_data = dhex(result) 69assert result_data == isotp.data 70 71timer.join(5) 72assert not timer.is_alive() 73 74 75= Compatibility with isotprecv - extended addresses 76exit_if_no_isotp_module() 77isotp = ISOTP(data=bytearray(range(1,20))) 78cmd = ["isotprecv", "-s245", "-d645", "-b3", "-x", "ee:aa", iface0] 79p = subprocess.Popen(cmd, stdout=subprocess.PIPE) 80time.sleep(0.1) # Give some time for starting reception 81with new_can_socket0() as isocan, ISOTPSoftSocket(isocan, tx_id=0x645, rx_id=0x245, ext_address=0xaa, rx_ext_address=0xee) as s: 82 s.send(isotp) 83 84timer = threading.Timer(1, lambda: p.terminate() if p.poll() else p.wait()) 85timer.start() # Timeout the receiver after 1 second 86r = p.wait() 87assert 0 == r 88 89result = None 90for i in range(10): 91 time.sleep(0.1) 92 if p.poll() is not None: 93 result = p.stdout.readline().decode().strip() 94 break 95 96assert result is not None 97result_data = dhex(result) 98assert result_data == isotp.data 99 100timer.join(5) 101assert not timer.is_alive() 102 103= Compatibility ISOTPSoftSocket ISOTPNativeSocket various configs 104exit_if_no_isotp_module() 105 106message = "01 02 03 04 05 06 07 08 09 0A 0B 0C 0D 0E 0F 10 11 12 13 14" * 5 107 108kwargs = [({"tx_id": 0x242, "rx_id": 0x642, "bs": 0, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None}, 109 {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None}), 110 ({"tx_id": 0x242, "rx_id": 0x642, "bs": 2, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None}, 111 {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None}), 112 ({"tx_id": 0x242, "rx_id": 0x642, "bs": 5, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None}, 113 {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None}), 114 ({"tx_id": 0x242, "rx_id": 0x642, "bs": 0, "stmin": 5, "padding": False, "ext_address": None, "rx_ext_address": None}, 115 {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None}), 116 ({"tx_id": 0x242, "rx_id": 0x642, "bs": 0, "stmin": 10, "padding": False, "ext_address": None, "rx_ext_address": None}, 117 {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None}), 118 ({"tx_id": 0x242, "rx_id": 0x642, "bs": 4, "stmin": 130, "padding": False, "ext_address": None, "rx_ext_address": None}, 119 {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": False, "ext_address": None, "rx_ext_address": None}), 120 ({"tx_id": 0x242, "rx_id": 0x642, "bs": 3, "stmin": 0, "padding": True, "ext_address": None, "rx_ext_address": None}, 121 {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": True, "ext_address": None, "rx_ext_address": None}), 122 ({"tx_id": 0x242, "rx_id": 0x642, "bs": 0, "stmin": 0, "padding": False, "ext_address": 0xfe, "rx_ext_address": None}, 123 {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": False, "ext_address": 0xfe, "rx_ext_address": None}), 124 ({"tx_id": 0x242, "rx_id": 0x642, "bs": 0, "stmin": 0, "padding": False, "ext_address": 0xfe, "rx_ext_address": 0xef}, 125 {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 0, "padding": False, "ext_address": 0xef, "rx_ext_address": 0xfe}), 126 ({"tx_id": 0x242, "rx_id": 0x642, "bs": 6, "stmin": 10, "padding": True, "ext_address": 0x12, "rx_ext_address": 0x23}, 127 {"tx_id": 0x642, "rx_id": 0x242, "bs": 6, "stmin": 5, "padding": True, "ext_address": 0x23, "rx_ext_address": 0x12}), 128 ({"tx_id": 0x242, "rx_id": 0x642, "bs": 0, "stmin": 0, "padding": True, "ext_address": 0x45, "rx_ext_address": None}, 129 {"tx_id": 0x642, "rx_id": 0x242, "bs": 0, "stmin": 40, "padding": True, "ext_address": 0x45, "rx_ext_address": None}), 130 ({"tx_id": 0x123, "rx_id": 0x642, "bs": 1, "stmin": 1, "padding": False, "ext_address": None, "rx_ext_address": None}, 131 {"tx_id": 0x642, "rx_id": 0x123, "bs": 1, "stmin": 1, "padding": False, "ext_address": None, "rx_ext_address": None}),] 132 133for kwargs1, kwargs2 in kwargs: 134 print("Testing config %s, %s" % (kwargs1, kwargs2)) 135 with NativeCANSocket(iface0) as cs: 136 cs.sniff(timeout=0.01) 137 with ISOTPSoftSocket(iface0, **kwargs1) as s, ISOTPNativeSocket(iface0, **kwargs2) as ns: 138 ns.send(ISOTP(bytes.fromhex(message))) 139 isotp = s.recv() 140 assert isotp.data == dhex(message) 141 ns.send(ISOTP(bytes.fromhex("00 11 22"))) 142 isotp = s.recv() 143 assert (isotp.data == dhex("00 11 22")) 144 pks1 = cs.sniff(timeout=0.01) 145 with ISOTPNativeSocket(iface0, **kwargs1) as s, ISOTPSoftSocket(iface0, **kwargs2) as ns: 146 ns.send(ISOTP(bytes.fromhex(message))) 147 isotp = s.recv() 148 assert isotp.data == dhex(message) 149 ns.send(ISOTP(bytes.fromhex("00 11 22"))) 150 isotp = s.recv() 151 assert (isotp.data == dhex("00 11 22")) 152 pks2 = cs.sniff(timeout=0.01) 153 assert len(pks1) == len(pks2) and len(pks2) > 0 154 for p1, p2 in zip(pks1, pks2): 155 assert bytes(p1) == bytes(p2) 156 157 158+ ISOTPNativeSocket tests 159 160= Create ISOTP socket 161exit_if_no_isotp_module() 162s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241) 163 164= Send single frame ISOTP message 165exit_if_no_isotp_module() 166 167with new_can_socket(iface0) as cans: 168 s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241) 169 s.send(ISOTP(data=dhex("01 02 03 04 05"))) 170 can = cans.sniff(timeout=1, count=1)[0] 171 assert can.identifier == 0x641 172 assert can.data == dhex("05 01 02 03 04 05") 173 174 175= Send single frame ISOTP message Test init with CANSocket 176exit_if_no_isotp_module() 177cans = CANSocket(iface0) 178s = ISOTPNativeSocket(cans, tx_id=0x641, rx_id=0x241) 179s.send(ISOTP(data=dhex("01 02 03 04 05"))) 180can = cans.sniff(timeout=1, count=1)[0] 181assert can.identifier == 0x641 182assert can.data == dhex("05 01 02 03 04 05") 183cans.close() 184 185 186= Test init with wrong type 187exit_if_no_isotp_module() 188exception_catched = False 189try: 190 s = ISOTPNativeSocket(42, tx_id=0x641, rx_id=0x241) 191except Scapy_Exception: 192 exception_catched = True 193 194assert exception_catched 195 196= Send two-frame ISOTP message 197exit_if_no_isotp_module() 198 199evt = threading.Event() 200def acker(): 201 with new_can_socket(iface0) as cans: 202 evt.set() 203 can = cans.sniff(timeout=1, count=1)[0] 204 cans.send(CAN(identifier = 0x241, data=dhex("30 00 00"))) 205 206 207with new_can_socket(iface0) as cans: 208 t = Thread(target=acker) 209 t.start() 210 s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241) 211 evt.wait(timeout=5) 212 s.send(ISOTP(data=dhex("01 02 03 04 05 06 07 08"))) 213 can = cans.sniff(timeout=1, count=1)[0] 214 assert can.identifier == 0x641 215 assert can.data == dhex("10 08 01 02 03 04 05 06") 216 can = cans.sniff(timeout=1, count=1)[0] 217 assert can.identifier == 0x241 218 assert can.data == dhex("30 00 00") 219 can = cans.sniff(timeout=1, count=1)[0] 220 assert can.identifier == 0x641 221 assert can.data == dhex("21 07 08") 222 t.join(timeout=5) 223 assert not t.is_alive() 224 225= Send a single frame ISOTP message with padding 226exit_if_no_isotp_module() 227s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, padding=True) 228 229with new_can_socket(iface0) as cans: 230 s.send(ISOTP(data=dhex("01"))) 231 can = cans.sniff(timeout=1, count=1)[0] 232 assert can.length == 8 233 234 235= Send a two-frame ISOTP message with padding 236exit_if_no_isotp_module() 237 238acker_ready = threading.Event() 239def acker(): 240 with new_can_socket(iface0) as acks: 241 acker_ready.set() 242 can = acks.sniff(timeout=1, count=1)[0] 243 acks.send(CAN(identifier = 0x241, data=dhex("30 00 00"))) 244 245with new_can_socket(iface0) as cans: 246 thread = Thread(target=acker) 247 thread.start() 248 acker_ready.wait(timeout=5) 249 s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, padding=True) 250 s.send(ISOTP(data=dhex("01 02 03 04 05 06 07 08"))) 251 can = cans.sniff(timeout=1, count=1)[0] 252 assert can.identifier == 0x641 253 assert can.data == dhex("10 08 01 02 03 04 05 06") 254 can = cans.sniff(timeout=1, count=1)[0] 255 assert can.identifier == 0x241 256 assert can.data == dhex("30 00 00") 257 can = cans.sniff(timeout=1, count=1)[0] 258 assert can.identifier == 0x641 259 assert can.data == dhex("21 07 08 CC CC CC CC CC") 260 thread.join(5) 261 assert not thread.is_alive() 262 263 264= Receive a padded single frame ISOTP message with padding disabled 265exit_if_no_isotp_module() 266s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, padding=False) 267with new_can_socket(iface0) as cans: 268 cans.send(CAN(identifier=0x241, data=dhex("02 05 06 00 00 00 00 00"))) 269 res = s.recv() 270 assert res.data == dhex("05 06") 271 272 273= Receive a padded single frame ISOTP message with padding enabled 274exit_if_no_isotp_module() 275s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, padding=True) 276with new_can_socket(iface0) as cans: 277 cans.send(CAN(identifier=0x241, data=dhex("02 05 06 00 00 00 00 00"))) 278 res = s.recv() 279 assert res.data == dhex("05 06") 280 281 282= Receive a non-padded single frame ISOTP message with padding enabled 283exit_if_no_isotp_module() 284s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, padding=True) 285with new_can_socket(iface0) as cans: 286 cans.send(CAN(identifier=0x241, data=dhex("02 05 06"))) 287 res = s.recv() 288 assert res.data == dhex("05 06") 289 290 291= Receive a padded two-frame ISOTP message with padding enabled 292exit_if_no_isotp_module() 293s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, padding=True) 294with new_can_socket(iface0) as cans: 295 cans.send(CAN(identifier=0x241, data=dhex("10 09 01 02 03 04 05 06"))) 296 cans.send(CAN(identifier=0x241, data=dhex("21 07 08 09 00 00 00 00"))) 297 res = s.recv() 298 assert res.data == dhex("01 02 03 04 05 06 07 08 09") 299 300 301= Receive a padded two-frame ISOTP message with padding disabled 302exit_if_no_isotp_module() 303s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, padding=False) 304with new_can_socket(iface0) as cans: 305 cans.send(CAN(identifier=0x241, data=dhex("10 09 01 02 03 04 05 06"))) 306 cans.send(CAN(identifier=0x241, data=dhex("21 07 08 09 00 00 00 00"))) 307 res = s.recv() 308 assert res.data == dhex("01 02 03 04 05 06 07 08 09") 309 310 311= Receive a single frame ISOTP message 312exit_if_no_isotp_module() 313s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241) 314with new_can_socket(iface0) as cans: 315 cans.send(CAN(identifier = 0x241, data = dhex("05 01 02 03 04 05"))) 316 isotp = s.recv() 317 assert isotp.data == dhex("01 02 03 04 05") 318 assert isotp.tx_id == 0x641 319 assert isotp.rx_id == 0x241 320 assert isotp.ext_address == None 321 assert isotp.rx_ext_address == None 322 323 324= Receive a single frame ISOTP message, with extended addressing 325exit_if_no_isotp_module() 326s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, ext_address=0xc0, rx_ext_address=0xea) 327with new_can_socket(iface0) as cans: 328 cans.send(CAN(identifier = 0x241, data = dhex("EA 05 01 02 03 04 05"))) 329 isotp = s.recv() 330 assert isotp.data == dhex("01 02 03 04 05") 331 assert isotp.tx_id == 0x641 332 assert isotp.rx_id == 0x241 333 assert isotp.ext_address == 0xc0 334 assert isotp.rx_ext_address == 0xea 335 336 337= Receive a two-frame ISOTP message 338exit_if_no_isotp_module() 339s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241) 340with new_can_socket(iface0) as cans: 341 cans.send(CAN(identifier = 0x241, data = dhex("10 0B 01 02 03 04 05 06"))) 342 cans.send(CAN(identifier = 0x241, data = dhex("21 07 08 09 10 11"))) 343 isotp = s.recv() 344 assert isotp.data == dhex("01 02 03 04 05 06 07 08 09 10 11") 345 346= Receive a two-frame ISOTP message and test python with statement 347exit_if_no_isotp_module() 348with ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241) as s: 349 with new_can_socket(iface0) as cans: 350 cans.send(CAN(identifier = 0x241, data = dhex("10 0B 01 02 03 04 05 06"))) 351 cans.send(CAN(identifier = 0x241, data = dhex("21 07 08 09 10 11"))) 352 isotp = s.recv() 353 assert isotp.data == dhex("01 02 03 04 05 06 07 08 09 10 11") 354 355 356= Send single CANFD frame ISOTP message 357exit_if_no_isotp_module() 358 359with new_can_socket(iface0, fd=True) as cans: 360 s = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241, fd=True) 361 s.send(ISOTP(data=dhex("01 02 03 04 05 06 07 08 09"))) 362 can = cans.sniff(timeout=1, count=1)[0] 363 assert can.identifier == 0x641 364 assert can.data == dhex("09 01 02 03 04 05 06 07 08 09") 365 366 367= ISOTP Socket sr1 test 368exit_if_no_isotp_module() 369 370txSock = ISOTPNativeSocket(iface0, tx_id=0x123, rx_id=0x321, basecls=ISOTP) 371txmsg = ISOTP(b'\x11\x22\x33') 372rx2 = None 373 374receiver_up = Event() 375 376def sender(): 377 global receiver_up 378 receiver_up.wait(timeout=5) 379 global txmsg 380 global rx2 381 rx2 = txSock.sr1(txmsg, timeout=1, verbose=True) 382 383def receiver(): 384 global receiver_up 385 with new_can_socket(iface0) as cans: 386 rx = cans.sniff(timeout=1, count=1, started_callback=receiver_up.set)[0] 387 cans.send(CAN(identifier=0x321, length=4, data=b'\x03\x7f\x22\x33')) 388 expectedrx = CAN(identifier=0x123, length=4, data=b'\x03\x11\x22\x33') 389 assert rx.length == expectedrx.length 390 assert rx.data == expectedrx.data 391 assert rx.identifier == expectedrx.identifier 392 393txThread = threading.Thread(target=sender) 394txThread.start() 395receiver() 396txThread.join(timeout=5) 397assert not txThread.is_alive() 398 399assert rx2 is not None 400assert rx2 == ISOTP(b'\x7f\x22\x33') 401assert rx2.answers(txmsg) 402 403= ISOTP Socket sr1 and ISOTP test 404exit_if_no_isotp_module() 405txSock = ISOTPNativeSocket(iface0, 0x123, 0x321, basecls=ISOTP) 406rxSock = ISOTPNativeSocket(iface0, 0x321, 0x123, basecls=ISOTP) 407msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33') 408rx2 = None 409 410receiver_up = Event() 411 412def sender(): 413 receiver_up.wait(timeout=5) 414 global rx2 415 rx2 = txSock.sr1(msg, timeout=1, verbose=True) 416 417def receiver(): 418 global rx 419 receiver_up.set() 420 rx = rxSock.recv() 421 rxSock.send(msg) 422 423txThread = threading.Thread(target=sender) 424txThread.start() 425receiver() 426txThread.join(timeout=5) 427assert not txThread.is_alive() 428 429assert rx == msg 430assert rxSock.send(msg) 431assert rx2 is not None 432assert rx2 == msg 433 434= ISOTP Socket sr1 and ISOTP test vice versa 435exit_if_no_isotp_module() 436 437rxSock = ISOTPNativeSocket(iface0, 0x321, 0x123, basecls=ISOTP) 438txSock = ISOTPNativeSocket(iface0, 0x123, 0x321, basecls=ISOTP) 439 440msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33') 441 442receiver_up = Event() 443 444def receiver(): 445 global rx2, sent 446 rx2 = rxSock.sniff(count=1, timeout=1, started_callback=receiver_up.set) 447 sent = rxSock.send(msg) 448 449def sender(): 450 global rx 451 receiver_up.wait(timeout=5) 452 rx = txSock.sr1(msg, timeout=1,verbose=True) 453 454rx2 = None 455sent = False 456rxThread = threading.Thread(target=receiver) 457rxThread.start() 458sender() 459rxThread.join(timeout=5) 460assert not rxThread.is_alive() 461 462assert rx == msg 463assert rx2[0] == msg 464assert sent 465 466= ISOTP Socket sniff 467exit_if_no_isotp_module() 468 469rxSock = ISOTPNativeSocket(iface0, 0x321, 0x123, basecls=ISOTP) 470txSock = ISOTPNativeSocket(iface0, 0x123, 0x321, basecls=ISOTP) 471succ = False 472 473receiver_up = Event() 474 475def receiver(): 476 rx = rxSock.sniff(count=5, timeout=1, started_callback=receiver_up.set) 477 msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33') 478 msg.data += b'0' 479 assert rx[0] == msg 480 msg.data += b'1' 481 assert rx[1] == msg 482 msg.data += b'2' 483 assert rx[2] == msg 484 msg.data += b'3' 485 assert rx[3] == msg 486 msg.data += b'4' 487 assert rx[4] == msg 488 global succ 489 succ = True 490 491def sender(): 492 receiver_up.wait(timeout=5) 493 msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33') 494 msg.data += b'0' 495 assert txSock.send(msg) 496 msg.data += b'1' 497 assert txSock.send(msg) 498 msg.data += b'2' 499 assert txSock.send(msg) 500 msg.data += b'3' 501 assert txSock.send(msg) 502 msg.data += b'4' 503 assert txSock.send(msg) 504 505rxThread = threading.Thread(target=receiver) 506rxThread.start() 507sender() 508rxThread.join(timeout=5) 509assert not rxThread.is_alive() 510 511assert succ 512 513+ ISOTPNativeSocket MITM attack tests 514~ vcan_socket needs_root linux 515 516= bridge and sniff with isotp native sockets set up vcan0 and vcan1 for package forwarding vcan1 517exit_if_no_isotp_module() 518 519isoTpSocket0 = ISOTPNativeSocket(iface0, tx_id=0x241, rx_id=0x641) 520isoTpSocket1 = ISOTPNativeSocket(iface1, tx_id=0x641, rx_id=0x241) 521bSocket0 = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241) 522bSocket1 = ISOTPNativeSocket(iface1, tx_id=0x241, rx_id=0x641) 523 524bridgeStarted = threading.Event() 525def bridge(): 526 global bridgeStarted 527 def forwarding(pkt): 528 return pkt 529 bridge_and_sniff(if1=bSocket0, if2=bSocket1, xfrm12=forwarding, xfrm21=forwarding, timeout=2, count=1, started_callback=bridgeStarted.set) 530 bSocket0.close() 531 bSocket1.close() 532 global bSucc 533 bSucc = True 534 535bSucc = False 536 537threadBridge = threading.Thread(target=bridge) 538threadBridge.start() 539bridgeStarted.wait(timeout=5) 540isoTpSocket0.send(ISOTP(b'Request')) 541packetsVCan1 = isoTpSocket1.sniff(timeout=0.5, count=1) 542 543assert len(packetsVCan1) == 1 544 545isoTpSocket0.close() 546isoTpSocket1.close() 547 548threadBridge.join(timeout=5) 549assert not threadBridge.is_alive() 550 551assert bSucc 552 553= bridge and sniff with isotp native sockets set up vcan0 and vcan1 for package change to vcan1 554exit_if_no_isotp_module() 555 556isoTpSocket0 = ISOTPNativeSocket(iface0, tx_id=0x241, rx_id=0x641) 557isoTpSocket1 = ISOTPNativeSocket(iface1, tx_id=0x641, rx_id=0x241) 558bSocket0 = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241) 559bSocket1 = ISOTPNativeSocket(iface1, tx_id=0x241, rx_id=0x641) 560 561bSucc = False 562 563bridgeStarted = threading.Event() 564def bridge(): 565 global bridgeStarted 566 global bSucc 567 def forwarding(pkt): 568 pkt.data = 'changed' 569 return pkt 570 bridge_and_sniff(if1=bSocket0, if2=bSocket1, xfrm12=forwarding, xfrm21=forwarding, timeout=0.5, started_callback=bridgeStarted.set) 571 bSocket0.close() 572 bSocket1.close() 573 bSucc = True 574 575threadBridge = threading.Thread(target=bridge) 576threadBridge.start() 577bridgeStarted.wait(timeout=5) 578isoTpSocket0.send(ISOTP(b'Request')) 579packetsVCan1 = isoTpSocket1.sniff(timeout=0.5, count=1) 580 581packetsVCan1[0].data = b'changed' 582assert len(packetsVCan1) == 1 583 584isoTpSocket0.close() 585isoTpSocket1.close() 586 587threadBridge.join(timeout=5) 588assert not threadBridge.is_alive() 589 590assert bSucc 591 592= bridge and sniff with isotp native sockets set up vcan0 and vcan1 for package forwarding in both directions 593exit_if_no_isotp_module() 594 595bSucc = False 596 597isoTpSocket0 = ISOTPNativeSocket(iface0, tx_id=0x241, rx_id=0x641) 598isoTpSocket1 = ISOTPNativeSocket(iface1, tx_id=0x641, rx_id=0x241) 599bSocket0 = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241) 600bSocket1 = ISOTPNativeSocket(iface1, tx_id=0x241, rx_id=0x641) 601 602bridgeStarted = threading.Event() 603def bridge(): 604 global bridgeStarted 605 global bSucc 606 def forwarding(pkt): 607 return pkt 608 bridge_and_sniff(if1=bSocket0, if2=bSocket1, xfrm12=forwarding, xfrm21=forwarding, timeout=0.5, started_callback=bridgeStarted.set, count=2) 609 bSocket0.close() 610 bSocket1.close() 611 bSucc = True 612 613threadBridge = threading.Thread(target=bridge) 614threadBridge.start() 615bridgeStarted.wait(timeout=5) 616 617packetVcan0 = ISOTP(b'RequestVcan0') 618packetVcan1 = ISOTP(b'RequestVcan1') 619isoTpSocket0.send(packetVcan0) 620isoTpSocket1.send(packetVcan1) 621 622packetsVCan0 = isoTpSocket0.sniff(timeout=0.5, count=1) 623packetsVCan1 = isoTpSocket1.sniff(timeout=0.5, count=1) 624 625len(packetsVCan0) == 1 626len(packetsVCan1) == 1 627 628isoTpSocket0.close() 629isoTpSocket1.close() 630 631threadBridge.join(timeout=5) 632assert not threadBridge.is_alive() 633 634assert bSucc 635 636= bridge and sniff with isotp native sockets set up vcan0 and vcan1 for package change in both directions 637exit_if_no_isotp_module() 638 639bSucc = False 640 641isoTpSocket0 = ISOTPNativeSocket(iface0, tx_id=0x241, rx_id=0x641) 642isoTpSocket1 = ISOTPNativeSocket(iface1, tx_id=0x641, rx_id=0x241) 643bSocket0 = ISOTPNativeSocket(iface0, tx_id=0x641, rx_id=0x241) 644bSocket1 = ISOTPNativeSocket(iface1, tx_id=0x241, rx_id=0x641) 645 646bridgeStarted = threading.Event() 647def bridge(): 648 global bridgeStarted 649 global bSucc 650 def forwarding(pkt): 651 pkt.data = 'changed' 652 return pkt 653 bridge_and_sniff(if1=bSocket0, if2=bSocket1, xfrm12=forwarding, xfrm21=forwarding, timeout=0.5, started_callback=bridgeStarted.set, count=2) 654 bSocket0.close() 655 bSocket1.close() 656 bSucc = True 657 658threadBridge = threading.Thread(target=bridge) 659threadBridge.start() 660bridgeStarted.wait(timeout=5) 661 662packetVcan0 = ISOTP(b'RequestVcan0') 663packetVcan1 = ISOTP(b'RequestVcan1') 664isoTpSocket0.send(packetVcan0) 665isoTpSocket1.send(packetVcan1) 666 667packetsVCan0 = isoTpSocket0.sniff(timeout=0.5, count=1) 668packetsVCan1 = isoTpSocket1.sniff(timeout=0.5, count=1) 669 670packetsVCan0[0].data = b'changed' 671assert len(packetsVCan0) == 1 672packetsVCan1[0].data = b'changed' 673assert len(packetsVCan1) == 1 674 675isoTpSocket0.close() 676isoTpSocket1.close() 677 678threadBridge.join(timeout=5) 679assert not threadBridge.is_alive() 680 681assert bSucc 682 683+ Cleanup 684 685= Cleanup reference to ISOTPSoftSocket to let the thread end 686s = None 687 688= Delete vcan interfaces 689 690assert cleanup_interfaces() 691