1% Regression tests for ISOTPSoftSocket 2~ automotive_comm 3 4+ Configuration 5~ conf 6 7= Imports 8import time 9from io import BytesIO 10from scapy.layers.can import * 11from scapy.contrib.isotp import * 12from scapy.contrib.isotp.isotp_soft_socket import TimeoutScheduler 13from test.testsocket import TestSocket, cleanup_testsockets 14with open(scapy_path("test/contrib/automotive/interface_mockup.py")) as f: 15 exec(f.read()) 16 17= Redirect logging 18import logging 19from scapy.error import log_runtime 20 21from io import StringIO 22 23log_stream = StringIO() 24handler = logging.StreamHandler(log_stream) 25log_runtime.addHandler(handler) 26log_isotp.addHandler(handler) 27 28= Definition of utility functions 29 30# hexadecimal to bytes convenience function 31dhex = bytes.fromhex 32 33 34+ Test sniffer 35= Test sniffer with multiple frames 36 37test_frames = [ 38 (0x241, "EA 10 28 01 02 03 04 05"), 39 (0x641, "EA 30 03 00" ), 40 (0x241, "EA 21 06 07 08 09 0A 0B"), 41 (0x241, "EA 22 0C 0D 0E 0F 10 11"), 42 (0x241, "EA 23 12 13 14 15 16 17"), 43 (0x641, "EA 30 03 00" ), 44 (0x241, "EA 24 18 19 1A 1B 1C 1D"), 45 (0x241, "EA 25 1E 1F 20 21 22 23"), 46 (0x241, "EA 26 24 25 26 27 28" ), 47] 48 49with TestSocket(CAN) as s, TestSocket(CAN) as tx_sock: 50 s.pair(tx_sock) 51 for f in test_frames: 52 tx_sock.send(CAN(identifier=f[0], data=dhex(f[1]))) 53 sniffed = sniff(opened_socket=s, session=ISOTPSession, timeout=1, count=1) 54 55assert sniffed[0]['ISOTP'].data == bytearray(range(1, 0x29)) 56assert sniffed[0]['ISOTP'].tx_id == 0x641 57assert sniffed[0]['ISOTP'].ext_address == 0xEA 58assert sniffed[0]['ISOTP'].rx_id == 0x241 59assert sniffed[0]['ISOTP'].rx_ext_address == 0xEA 60 61+ ISOTPSoftSocket tests 62 63= CAN socket FD 64~ not_pypy needs_root linux vcan_socket 65 66with ISOTPSoftSocket(iface0, tx_id=0x641, rx_id=0x241, fd=True) as s: 67 assert s.impl.can_socket.fd == True 68 69= CAN socket non-FD 70~ not_pypy needs_root linux vcan_socket 71 72with ISOTPSoftSocket(iface0, tx_id=0x641, rx_id=0x241) as s: 73 assert s.impl.can_socket.fd == False 74 75= Single-frame receive 76 77with TestSocket(CAN) as cans, TestSocket(CAN) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241) as s: 78 cans.pair(stim) 79 stim.send(CAN(identifier=0x241, data=dhex("05 01 02 03 04 05"))) 80 pkts = s.sniff(count=1, timeout=1) 81 if not len(pkts): 82 s.failure_analysis() 83 raise Scapy_Exception("ERROR") 84 msg = pkts[0] 85 assert msg.data == dhex("01 02 03 04 05") 86 87= Single-frame receive FD 88 89with TestSocket(CANFD) as cans, TestSocket(CANFD) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241, fd=True) as s: 90 pl_sizes_testings = [1, 5, 7, 8, 15, 20, 35, 40, 46, 62] 91 data_str = "" 92 data_str_offset = 0 93 cans.pair(stim) 94 for size_to_send in pl_sizes_testings: 95 if size_to_send > 7: 96 data_str = "00 {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(size_to_send)])) 97 data_str_offset = 6 98 else: 99 data_str = "{} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(size_to_send)])) 100 data_str_offset = 2 101 stim.send(CANFD(identifier=0x241, data=dhex(data_str))) 102 pkts = s.sniff(count=1, timeout=1) 103 if not len(pkts): 104 s.failure_analysis() 105 raise Scapy_Exception("ERROR") 106 msg = pkts[0] 107 assert msg.data == dhex(data_str[data_str_offset:]) 108 109= Single-frame send 110 111with TestSocket(CAN) as cans, TestSocket(CAN) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241) as s: 112 cans.pair(stim) 113 s.send(ISOTP(dhex("01 02 03 04 05"))) 114 pkts = stim.sniff(count=1, timeout=1) 115 if not len(pkts): 116 s.failure_analysis() 117 raise Scapy_Exception("ERROR") 118 msg = pkts[0] 119 assert msg.data == dhex("05 01 02 03 04 05") 120 121= Single-frame send FD 122 123with TestSocket(CANFD) as cans, TestSocket(CANFD) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241, fd=True) as s: 124 pl_sizes_testings = [1, 5, 7, 8, 15, 20, 35, 40, 46, 62] 125 data_str = "" 126 data_str_offset = 0 127 cans.pair(stim) 128 for size_to_send in pl_sizes_testings: 129 if size_to_send > 7: 130 data_str = "00 {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(size_to_send)])) 131 data_str_offset = 6 132 else: 133 data_str = "{} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(size_to_send)])) 134 data_str_offset = 2 135 s.send(ISOTP(dhex(data_str[data_str_offset:]))) 136 pkts = stim.sniff(count=1, timeout=1) 137 if not len(pkts): 138 s.failure_analysis() 139 raise Scapy_Exception("ERROR") 140 msg = pkts[0] 141 assert msg.data == dhex(data_str) 142 143= Two frame receive 144 145with TestSocket(CAN) as cans, TestSocket(CAN) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241) as s: 146 cans.pair(stim) 147 stim.send(CAN(identifier=0x241, data=dhex("10 09 01 02 03 04 05 06"))) 148 pkts = stim.sniff(count=1, timeout=1) 149 if not len(pkts): 150 s.failure_analysis() 151 raise Scapy_Exception("ERROR") 152 c = pkts[0] 153 assert (c.data == dhex("30 00 00")) 154 stim.send(CAN(identifier=0x241, data=dhex("21 07 08 09 00 00 00 00"))) 155 pkts = s.sniff(count=1, timeout=1) 156 if not len(pkts): 157 s.failure_analysis() 158 raise Scapy_Exception("ERROR") 159 msg = pkts[0] 160 assert msg.data == dhex("01 02 03 04 05 06 07 08 09") 161 162 163= Two frame receive FD 164 165with TestSocket(CANFD) as cans, TestSocket(CANFD) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241, fd=True) as s: 166 cans.pair(stim) 167 stim.send(CANFD(identifier=0x241, data=dhex("10 09 01 02 03 04 05 06 07 08 09 0A 0B"))) 168 pkts = stim.sniff(count=1, timeout=1) 169 if not len(pkts): 170 s.failure_analysis() 171 raise Scapy_Exception("ERROR") 172 c = pkts[0] 173 assert (c.data == dhex("30 00 00")) 174 stim.send(CANFD(identifier=0x241, data=dhex("21 07 08 09 00 00 00 00"))) 175 pkts = s.sniff(count=1, timeout=1) 176 if not len(pkts): 177 s.failure_analysis() 178 raise Scapy_Exception("ERROR") 179 msg = pkts[0] 180 assert msg.data == dhex("01 02 03 04 05 06 07 08 09") 181 182 183= 20000 bytes receive 184 185def test(): 186 with TestSocket(CAN) as cans, TestSocket(CAN) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241) as s: 187 cans.pair(stim) 188 data = dhex("01 02 03 04 05") * 4000 189 cf = ISOTP(data, rx_id=0x241).fragment() 190 ff = cf.pop(0) 191 cs = stim.sniff(count=1, timeout=3, started_callback=lambda: stim.send(ff)) 192 assert len(cs) 193 c = cs[0] 194 assert (c.data == dhex("30 00 00")) 195 for f in cf: 196 _ = stim.send(f) 197 msgs = s.sniff(count=1, timeout=30) 198 print(msgs) 199 msg = msgs[0] 200 assert msg.data == data 201 202test() 203 204= 20000 bytes send 205 206def test(): 207 with TestSocket(CAN) as cans, TestSocket(CAN) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241) as s: 208 cans.pair(stim) 209 data = dhex("01 02 03 04 05")*4000 210 msg = ISOTP(data, rx_id=0x641) 211 fragments = msg.fragment() 212 ack = CAN(identifier=0x241, data=dhex("30 00 00")) 213 ff = stim.sniff(timeout=1, count=1, 214 started_callback=lambda:s.send(msg)) 215 assert len(ff) == 1 216 cfs = stim.sniff(timeout=20, count=len(fragments) - 1, 217 started_callback=lambda: stim.send(ack)) 218 for fragment, cf in zip(fragments, ff + cfs): 219 assert (bytes(fragment) == bytes(cf)) 220 221test() 222 223= 20000 bytes send FD 224 225def testfd(): 226 with TestSocket(CANFD) as cans, TestSocket(CANFD) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241, fd=True) as s: 227 cans.pair(stim) 228 data = dhex("01 02 03 04 05")*4006 229 msg = ISOTP(data, rx_id=0x641) 230 fragments = msg.fragment(fd=True) 231 ack = CANFD(identifier=0x241, data=dhex("30 00 00")) 232 ff = stim.sniff(timeout=1, count=1, 233 started_callback=lambda:s.send(msg)) 234 assert len(ff) == 1 235 cfs = stim.sniff(timeout=20, count=len(fragments) - 1, 236 started_callback=lambda: stim.send(ack)) 237 for fragment, cf in zip(fragments, ff + cfs): 238 assert (bytes(fragment) == bytes(cf)) 239 240testfd() 241 242= Close ISOTPSoftSocket 243 244with TestSocket(CAN) as cans, TestSocket(CAN) as stim, ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241) as s: 245 cans.pair(stim) 246 s.close() 247 s = None 248 249= Test on_recv function with single frame 250with ISOTPSoftSocket(TestSocket(CAN), tx_id=0x641, rx_id=0x241) as s: 251 s.ins.on_recv(CAN(identifier=0x241, data=dhex("05 01 02 03 04 05"))) 252 msg, ts = s.ins.rx_queue.recv() 253 assert msg == dhex("01 02 03 04 05") 254 255= Test on_recv function with single frame FD 256with ISOTPSoftSocket(TestSocket(CANFD), tx_id=0x641, rx_id=0x241, fd=True) as s: 257 pl_sizes_testings = [1, 5, 7, 8, 15, 20, 35, 40, 46, 62] 258 data_str = "" 259 data_str_offset = 0 260 for size_to_send in pl_sizes_testings: 261 if size_to_send > 7: 262 data_str = "00 {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(size_to_send)])) 263 data_str_offset = 6 264 else: 265 data_str = "{} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(size_to_send)])) 266 data_str_offset = 2 267 s.ins.on_recv(CANFD(identifier=0x241, data=dhex(data_str))) 268 msg, ts = s.ins.rx_queue.recv() 269 assert msg == dhex(data_str[data_str_offset:]) 270 271= Test on_recv function with empty frame 272with ISOTPSoftSocket(TestSocket(CAN), tx_id=0x641, rx_id=0x241) as s: 273 s.ins.on_recv(CAN(identifier=0x241, data=b"")) 274 assert s.ins.rx_queue.empty() 275 276= Test on_recv function with single frame and extended addressing 277with ISOTPSoftSocket(TestSocket(CAN), tx_id=0x641, rx_id=0x241, rx_ext_address=0xea) as s: 278 cf = CAN(identifier=0x241, data=dhex("EA 05 01 02 03 04 05")) 279 s.ins.on_recv(cf) 280 msg, ts = s.ins.rx_queue.recv() 281 assert msg == dhex("01 02 03 04 05") 282 assert ts == cf.time 283 284 285= Test on_recv function with single frame and extended addressing FD 286with ISOTPSoftSocket(TestSocket(CANFD), tx_id=0x641, rx_id=0x241, rx_ext_address=0xea, fd=True) as s: 287 pl_sizes_testings = [1, 5, 7, 8, 15, 20, 35, 40, 46, 62] 288 data_str = "" 289 data_str_offset = 0 290 for size_to_send in pl_sizes_testings: 291 if size_to_send > 7: 292 data_str = "EA 00 {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(size_to_send)])) 293 data_str_offset = 8 294 else: 295 data_str = "EA {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(size_to_send)])) 296 data_str_offset = 5 297 cf = CANFD(identifier=0x241, data=dhex(data_str)) 298 s.ins.on_recv(cf) 299 msg, ts = s.ins.rx_queue.recv() 300 assert msg == dhex(data_str[data_str_offset:]) 301 assert ts == cf.time 302 303= CF is sent when first frame is received 304cans = TestSocket(CAN) 305can_out = TestSocket(CAN) 306cans.pair(can_out) 307with ISOTPSoftSocket(cans, tx_id=0x641, rx_id=0x241) as s: 308 s.ins.on_recv(CAN(identifier=0x241, data=dhex("10 20 01 02 03 04 05 06"))) 309 can = can_out.sniff(timeout=1, count=1)[0] 310 assert can.identifier == 0x641 311 assert can.data == dhex("30 00 00") 312 313cans.close() 314can_out.close() 315 316+ Testing ISOTPSoftSocket with an actual CAN socket 317 318= Verify that packets are not lost if they arrive before the sniff() is called 319with TestSocket(CAN) as ss, TestSocket(CAN) as sr: 320 ss.pair(sr) 321 tx_func = lambda: ss.send(CAN(identifier=0x111, data=b"\x01\x23\x45\x67")) 322 p = sr.sniff(count=1, timeout=0.2, started_callback=tx_func) 323 assert len(p)==1 324 tx_func = lambda: ss.send(CAN(identifier=0x111, data=b"\x89\xab\xcd\xef")) 325 p = sr.sniff(count=1, timeout=0.2, started_callback=tx_func) 326 assert len(p)==1 327 328= Send single frame ISOTP message, using send 329with TestSocket(CAN) as isocan, \ 330 ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, \ 331 TestSocket(CAN) as cans: 332 cans.pair(isocan) 333 can = cans.sniff(timeout=2, count=1, started_callback=lambda: s.send(ISOTP(data=dhex("01 02 03 04 05")))) 334 assert can[0].identifier == 0x641 335 assert can[0].data == dhex("05 01 02 03 04 05") 336 337= Send many single frame ISOTP messages, using send 338 339with TestSocket(CAN) as isocan, \ 340 ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, \ 341 TestSocket(CAN) as cans: 342 cans.pair(isocan) 343 for i in range(100): 344 data = dhex("01 02 03 04 05") + struct.pack("B", i) 345 expected = struct.pack("B", len(data)) + data 346 can = cans.sniff(timeout=4, count=1, started_callback=lambda: s.send(ISOTP(data=data))) 347 assert can[0].identifier == 0x641 348 print(can[0].data, data) 349 assert can[0].data == expected 350 351 352= Send two-frame ISOTP message, using send 353with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans: 354 cans.pair(isocan) 355 can = cans.sniff(timeout=1, count=1, started_callback=lambda: s.send(ISOTP(data=dhex("01 02 03 04 05 06 07 08")))) 356 assert can[0].identifier == 0x641 357 assert can[0].data == dhex("10 08 01 02 03 04 05 06") 358 can = cans.sniff(timeout=1, count=1, started_callback=lambda: cans.send(CAN(identifier = 0x241, data=dhex("30 00 00")))) 359 assert can[0].identifier == 0x641 360 assert can[0].data == dhex("21 07 08") 361 362= Send two-frame ISOTP message, using send FD 363with TestSocket(CANFD) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, fd=True) as s, TestSocket(CANFD) as cans: 364 size_to_send = 100 365 max_pl_size = 62 366 data_str = "{}".format(" ".join(["%02X" % x for x in range(size_to_send)])) 367 cans.pair(isocan) 368 can = cans.sniff(timeout=1, count=1, started_callback=lambda: s.send(dhex(data_str))) 369 assert can[0].identifier == 0x641 370 assert can[0].data == dhex("10 {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(max_pl_size)]))) 371 can = cans.sniff(timeout=1, count=1, started_callback=lambda: cans.send(CANFD(identifier = 0x241, data=dhex("30 00 00")))) 372 assert can[0].identifier == 0x641 373 assert can[0].data == dhex("21 {}".format(" ".join(["%02X" % x for x in range(max_pl_size, size_to_send)]))) 374 375= Send single frame ISOTP message 376with TestSocket(CAN) as cans, TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s: 377 cans.pair(isocan) 378 s.send(ISOTP(data=dhex("01 02 03 04 05"))) 379 can = cans.sniff(timeout=1, count=1) 380 assert can[0].identifier == 0x641 381 assert can[0].data == dhex("05 01 02 03 04 05") 382 383 384= Send two-frame ISOTP message 385 386acks = TestSocket(CAN) 387 388acker_ready = threading.Event() 389def acker(): 390 acker_ready.set() 391 can_pkt = acks.sniff(timeout=1, count=1) 392 can = can_pkt[0] 393 acks.send(CAN(identifier = 0x241, data=dhex("30 00 00"))) 394 395thread = Thread(target=acker) 396thread.start() 397acker_ready.wait(timeout=5) 398with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans: 399 cans.pair(isocan) 400 cans.pair(acks) 401 isocan.pair(acks) 402 s.send(ISOTP(data=dhex("01 02 03 04 05 06 07 08"))) 403 pkts = cans.sniff(timeout=1, count=1) 404 if not len(pkts): 405 s.failure_analysis() 406 raise Scapy_Exception("ERROR") 407 can = pkts[0] 408 assert can.identifier == 0x641 409 assert can.data == dhex("10 08 01 02 03 04 05 06") 410 pkts = cans.sniff(timeout=1, count=1) 411 if not len(pkts): 412 s.failure_analysis() 413 raise Scapy_Exception("ERROR") 414 can = pkts[0] 415 assert can.identifier == 0x241 416 assert can.data == dhex("30 00 00") 417 pkts = cans.sniff(timeout=1, count=1) 418 if not len(pkts): 419 s.failure_analysis() 420 raise Scapy_Exception("ERROR") 421 can = pkts[0] 422 assert can.identifier == 0x641 423 assert can.data == dhex("21 07 08") 424 425thread.join(15) 426acks.close() 427assert not thread.is_alive() 428 429= Send two-frame ISOTP message FD 430 431acks = TestSocket(CANFD) 432 433acker_ready = threading.Event() 434def acker(): 435 acker_ready.set() 436 can_pkt = acks.sniff(timeout=1, count=1) 437 can = can_pkt[0] 438 acks.send(CANFD(identifier = 0x241, data=dhex("30 00 00"))) 439 440thread = Thread(target=acker) 441thread.start() 442acker_ready.wait(timeout=5) 443with TestSocket(CANFD) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, fd=True) as s, TestSocket(CANFD) as cans: 444 size_to_send = 123 445 max_pl_size = 62 446 data_str = "{}".format(" ".join(["%02X" % x for x in range(size_to_send)])) 447 cans.pair(isocan) 448 cans.pair(acks) 449 isocan.pair(acks) 450 s.send(dhex(data_str)) 451 pkts = cans.sniff(timeout=1, count=1) 452 if not len(pkts): 453 s.failure_analysis() 454 raise Scapy_Exception("ERROR") 455 can = pkts[0] 456 assert can.identifier == 0x641 457 assert can.data == dhex("10 {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(max_pl_size)]))) 458 pkts = cans.sniff(timeout=1, count=1) 459 if not len(pkts): 460 s.failure_analysis() 461 raise Scapy_Exception("ERROR") 462 can = pkts[0] 463 assert can.identifier == 0x241 464 assert can.data == dhex("30 00 00") 465 pkts = cans.sniff(timeout=1, count=1) 466 if not len(pkts): 467 s.failure_analysis() 468 raise Scapy_Exception("ERROR") 469 can = pkts[0] 470 assert can.identifier == 0x641 471 assert can.data == dhex("21 {}".format(" ".join(["%02X" % x for x in range(max_pl_size, size_to_send)]))) 472 473thread.join(15) 474acks.close() 475assert not thread.is_alive() 476 477= Send two-frame ISOTP message with bs 478 479acks = TestSocket(CAN) 480acker_ready = threading.Event() 481def acker(): 482 acker_ready.set() 483 can_pkt = acks.sniff(timeout=1, count=1) 484 acks.send(CAN(identifier = 0x241, data=dhex("30 20 00"))) 485 486thread = Thread(target=acker) 487thread.start() 488acker_ready.wait(timeout=5) 489with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans: 490 cans.pair(isocan) 491 cans.pair(acks) 492 isocan.pair(acks) 493 s.send(ISOTP(data=dhex("01 02 03 04 05 06 07 08"))) 494 pkts = cans.sniff(timeout=1, count=1) 495 if not len(pkts): 496 s.failure_analysis() 497 raise Scapy_Exception("ERROR") 498 can = pkts[0] 499 assert can.identifier == 0x641 500 assert can.data == dhex("10 08 01 02 03 04 05 06") 501 pkts = cans.sniff(timeout=1, count=1) 502 if not len(pkts): 503 s.failure_analysis() 504 raise Scapy_Exception("ERROR") 505 can = pkts[0] 506 assert can.identifier == 0x241 507 assert can.data == dhex("30 20 00") 508 pkts = cans.sniff(timeout=1, count=1) 509 if not len(pkts): 510 s.failure_analysis() 511 raise Scapy_Exception("ERROR") 512 can = pkts[0] 513 assert can.identifier == 0x641 514 assert can.data == dhex("21 07 08") 515 516thread.join(15) 517acks.close() 518assert not thread.is_alive() 519 520= Send two-frame ISOTP message with bs FD 521 522acks = TestSocket(CANFD) 523acker_ready = threading.Event() 524def acker(): 525 acker_ready.set() 526 can_pkt = acks.sniff(timeout=1, count=1) 527 acks.send(CANFD(identifier = 0x241, data=dhex("30 20 00"))) 528 529thread = Thread(target=acker) 530thread.start() 531acker_ready.wait(timeout=5) 532with TestSocket(CANFD) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, fd=True) as s, TestSocket(CANFD) as cans: 533 size_to_send = 124 534 max_pl_size = 62 535 data_str = "{}".format(" ".join(["%02X" % x for x in range(size_to_send)])) 536 cans.pair(isocan) 537 cans.pair(acks) 538 isocan.pair(acks) 539 s.send(ISOTP(data=dhex(data_str))) 540 pkts = cans.sniff(timeout=1, count=1) 541 if not len(pkts): 542 s.failure_analysis() 543 raise Scapy_Exception("ERROR") 544 can = pkts[0] 545 assert can.identifier == 0x641 546 assert can.data == dhex("10 {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(max_pl_size)]))) 547 pkts = cans.sniff(timeout=1, count=1) 548 if not len(pkts): 549 s.failure_analysis() 550 raise Scapy_Exception("ERROR") 551 can = pkts[0] 552 assert can.identifier == 0x241 553 assert can.data == dhex("30 20 00") 554 pkts = cans.sniff(timeout=1, count=1) 555 if not len(pkts): 556 s.failure_analysis() 557 raise Scapy_Exception("ERROR") 558 can = pkts[0] 559 assert can.identifier == 0x641 560 assert can.data == dhex("21 {}".format(" ".join(["%02X" % x for x in range(max_pl_size, size_to_send)]))) 561 562thread.join(15) 563acks.close() 564assert not thread.is_alive() 565 566= Send two-frame ISOTP message with ST 567acks = TestSocket(CAN) 568acker_ready = threading.Event() 569def acker(): 570 acker_ready.set() 571 acks.sniff(timeout=1, count=1) 572 acks.send(CAN(identifier = 0x241, data=dhex("30 00 10"))) 573 574thread = Thread(target=acker) 575thread.start() 576acker_ready.wait(timeout=5) 577with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans: 578 cans.pair(isocan) 579 cans.pair(acks) 580 isocan.pair(acks) 581 s.send(ISOTP(data=dhex("01 02 03 04 05 06 07 08"))) 582 pkts = cans.sniff(timeout=1, count=1) 583 if not len(pkts): 584 s.failure_analysis() 585 raise Scapy_Exception("ERROR") 586 can = pkts[0] 587 assert can.identifier == 0x641 588 assert can.data == dhex("10 08 01 02 03 04 05 06") 589 pkts = cans.sniff(timeout=1, count=1) 590 if not len(pkts): 591 s.failure_analysis() 592 raise Scapy_Exception("ERROR") 593 can = pkts[0] 594 assert can.identifier == 0x241 595 assert can.data == dhex("30 00 10") 596 pkts = cans.sniff(timeout=1, count=1) 597 if not len(pkts): 598 s.failure_analysis() 599 raise Scapy_Exception("ERROR") 600 can = pkts[0] 601 assert can.identifier == 0x641 602 assert can.data == dhex("21 07 08") 603 604thread.join(15) 605acks.close() 606assert not thread.is_alive() 607 608= Send two-frame ISOTP message with ST FD 609acks = TestSocket(CANFD) 610acker_ready = threading.Event() 611def acker(): 612 acker_ready.set() 613 acks.sniff(timeout=1, count=1) 614 acks.send(CANFD(identifier = 0x241, data=dhex("30 00 10"))) 615 616thread = Thread(target=acker) 617thread.start() 618acker_ready.wait(timeout=5) 619with TestSocket(CANFD) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, fd=True) as s, TestSocket(CANFD) as cans: 620 size_to_send = 124 621 max_pl_size = 62 622 data_str = "{}".format(" ".join(["%02X" % x for x in range(size_to_send)])) 623 cans.pair(isocan) 624 cans.pair(acks) 625 isocan.pair(acks) 626 s.send(dhex(data_str)) 627 pkts = cans.sniff(timeout=1, count=1) 628 if not len(pkts): 629 s.failure_analysis() 630 raise Scapy_Exception("ERROR") 631 can = pkts[0] 632 assert can.identifier == 0x641 633 assert can.data == dhex("10 {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(max_pl_size)]))) 634 pkts = cans.sniff(timeout=1, count=1) 635 if not len(pkts): 636 s.failure_analysis() 637 raise Scapy_Exception("ERROR") 638 can = pkts[0] 639 assert can.identifier == 0x241 640 assert can.data == dhex("30 00 10") 641 pkts = cans.sniff(timeout=1, count=1) 642 if not len(pkts): 643 s.failure_analysis() 644 raise Scapy_Exception("ERROR") 645 can = pkts[0] 646 assert can.identifier == 0x641 647 assert can.data == dhex("21 {}".format(" ".join(["%02X" % x for x in range(max_pl_size, size_to_send)]))) 648 649thread.join(15) 650acks.close() 651assert not thread.is_alive() 652 653= Receive a single frame ISOTP message 654 655with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans: 656 cans.pair(isocan) 657 cans.send(CAN(identifier = 0x241, data = dhex("05 01 02 03 04 05"))) 658 pkts = s.sniff(count=1, timeout=2) 659 if not len(pkts): 660 s.failure_analysis() 661 raise Scapy_Exception("ERROR") 662 isotp = pkts[0] 663 assert isotp.data == dhex("01 02 03 04 05") 664 assert isotp.tx_id == 0x641 665 assert isotp.rx_id == 0x241 666 assert isotp.ext_address == None 667 assert isotp.rx_ext_address == None 668 669 670= Receive a single frame ISOTP message, with extended addressing 671 672with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, ext_address=0xc0, rx_ext_address=0xea) as s, TestSocket(CAN) as cans: 673 cans.pair(isocan) 674 cans.send(CAN(identifier = 0x241, data = dhex("EA 05 01 02 03 04 05"))) 675 pkts = s.sniff(count=1, timeout=2) 676 if not len(pkts): 677 s.failure_analysis() 678 raise Scapy_Exception("ERROR") 679 isotp = pkts[0] 680 assert isotp.data == dhex("01 02 03 04 05") 681 assert isotp.tx_id == 0x641 682 assert isotp.rx_id == 0x241 683 assert isotp.ext_address == 0xc0 684 assert isotp.rx_ext_address == 0xea 685 686 687= Receive frames from CandumpReader 688candump_fd = BytesIO(b''' vcan0 541 [8] 10 0A DE AD BE EF AA AA 689 vcan0 241 [3] 30 00 00 690 vcan0 541 [5] 21 AA AA AA AA 691 vcan0 541 [8] 10 0A DE AD BE EF AA AA 692 vcan0 541 [8] 10 0A DE AD BE EF AA AA 693 vcan0 241 [3] 30 00 00 694 vcan0 541 [5] 21 AA AA AA AA 695 vcan0 541 [8] 10 0A DE AD BE EF AA AA 696 vcan0 241 [3] 30 00 00 697 vcan0 541 [5] 21 AA AA AA AA 698 vcan0 541 [8] 10 0A DE AD BE EF AA AA 699 vcan0 241 [3] 30 00 00 700 vcan0 541 [5] 21 AA AA AA AA 701 vcan0 541 [8] 10 0A DE AD BE EF AA AA 702 vcan0 241 [3] 30 00 00 703 vcan0 541 [5] 21 AA AA AA AA 704 vcan0 541 [8] 10 0A DE AD BE EF AA AA 705 vcan0 241 [3] 30 00 00 706 vcan0 541 [5] 21 AA AA AA AA''') 707 708with ISOTPSoftSocket(CandumpReader(candump_fd), tx_id=0x241, rx_id=0x541, listen_only=True) as s: 709 pkts = s.sniff(timeout=2, count=6) 710 assert len(pkts) == 6 711 if not len(pkts): 712 s.failure_analysis() 713 raise Scapy_Exception("ERROR") 714 isotp = pkts[0] 715 print(repr(isotp)) 716 print(hex(isotp.tx_id)) 717 print(hex(isotp.rx_id)) 718 assert isotp.data == dhex("DE AD BE EF AA AA AA AA AA AA") 719 assert isotp.tx_id == 0x241 720 assert isotp.rx_id == 0x541 721 722= Receive frames from CandumpReader with ISOTPSniffer without extended addressing 723candump_fd = BytesIO(b''' vcan0 541 [8] 10 0A DE AD BE EF AA AA 724 vcan0 241 [3] 30 00 00 725 vcan0 541 [5] 21 AA AA AA AA 726 vcan0 541 [8] 10 0A DE AD BE EF AA AA 727 vcan0 541 [8] 10 0A DE AD BE EF AA AA 728 vcan0 241 [3] 30 00 00 729 vcan0 541 [5] 21 AA AA AA AA 730 vcan0 541 [8] 10 0A DE AD BE EF AA AA 731 vcan0 241 [3] 30 00 00 732 vcan0 541 [5] 21 AA AA AA AA 733 vcan0 541 [8] 10 0A DE AD BE EF AA AA 734 vcan0 241 [3] 30 00 00 735 vcan0 541 [5] 21 AA AA AA AA 736 vcan0 541 [8] 10 0A DE AD BE EF AA AA 737 vcan0 241 [3] 30 00 00 738 vcan0 541 [5] 21 AA AA AA AA 739 vcan0 541 [8] 10 0A DE AD BE EF AA AA 740 vcan0 241 [3] 30 00 00 741 vcan0 541 [5] 21 AA AA AA AA''') 742 743pkts = sniff(opened_socket=CandumpReader(candump_fd), session=ISOTPSession(use_ext_address=False), timeout=1) 744assert len(pkts) == 6 745 746if not len(pkts): 747 s.failure_analysis() 748 raise Scapy_Exception("ERROR") 749 750isotp = pkts[0] 751assert isotp.data == dhex("DE AD BE EF AA AA AA AA AA AA") 752assert (isotp.rx_id == 0x541) 753 754= Receive frames from CandumpReader with ISOTPSniffer 755* all flow control frames are detected as single frame with extended address 756 757candump_fd = BytesIO(b''' vcan0 541 [8] 10 0A DE AD BE EF AA AA 758 vcan0 241 [3] 30 00 00 759 vcan0 541 [5] 21 AA AA AA AA 760 vcan0 541 [8] 10 0A DE AD BE EF AA AA 761 vcan0 541 [8] 10 0A DE AD BE EF AA AA 762 vcan0 241 [3] 30 00 00 763 vcan0 541 [5] 21 AA AA AA AA 764 vcan0 541 [8] 10 0A DE AD BE EF AA AA 765 vcan0 241 [3] 30 00 00 766 vcan0 541 [5] 21 AA AA AA AA 767 vcan0 541 [8] 10 0A DE AD BE EF AA AA 768 vcan0 241 [3] 30 00 00 769 vcan0 541 [5] 21 AA AA AA AA 770 vcan0 541 [8] 10 0A DE AD BE EF AA AA 771 vcan0 241 [3] 30 00 00 772 vcan0 541 [5] 21 AA AA AA AA 773 vcan0 541 [8] 10 0A DE AD BE EF AA AA 774 vcan0 241 [3] 30 00 00 775 vcan0 541 [5] 21 AA AA AA AA''') 776 777pkts = sniff(opened_socket=CandumpReader(candump_fd), session=ISOTPSession, timeout=1) 778if not len(pkts): 779 s.failure_analysis() 780 raise Scapy_Exception("ERROR") 781 782assert len(pkts) == 12 783isotp = pkts[1] 784assert isotp.data == dhex("DE AD BE EF AA AA AA AA AA AA") 785assert (isotp.rx_id == 0x541) 786isotp = pkts[0] 787assert isotp.data == dhex("") 788assert (isotp.rx_id == 0x241) 789 790= Receive frames from CandumpReader with ISOTPSniffer and count 791* all flow control frames are detected as single frame with extended address 792 793candump_fd = BytesIO(b''' vcan0 541 [8] 10 0A DE AD BE EF AA AA 794 vcan0 241 [3] 30 00 00 795 vcan0 541 [5] 21 AA AA AA AA 796 vcan0 541 [8] 10 0A DE AD BE EF AA AA 797 vcan0 541 [8] 10 0A DE AD BE EF AA AA 798 vcan0 241 [3] 30 00 00 799 vcan0 541 [5] 21 AA AA AA AA 800 vcan0 541 [8] 10 0A DE AD BE EF AA AA 801 vcan0 241 [3] 30 00 00 802 vcan0 541 [5] 21 AA AA AA AA 803 vcan0 541 [8] 10 0A DE AD BE EF AA AA 804 vcan0 241 [3] 30 00 00 805 vcan0 541 [5] 21 AA AA AA AA 806 vcan0 541 [8] 10 0A DE AD BE EF AA AA 807 vcan0 241 [3] 30 00 00 808 vcan0 541 [5] 21 AA AA AA AA 809 vcan0 541 [8] 10 0A DE AD BE EF AA AA 810 vcan0 241 [3] 30 00 00 811 vcan0 541 [5] 21 AA AA AA AA''') 812 813pkts = sniff(opened_socket=CandumpReader(candump_fd), session=ISOTPSession, timeout=1, count=2) 814if not len(pkts): 815 s.failure_analysis() 816 raise Scapy_Exception("ERROR") 817 818assert len(pkts) == 2 819isotp = pkts[1] 820assert isotp.data == dhex("DE AD BE EF AA AA AA AA AA AA") 821assert (isotp.rx_id == 0x541) 822isotp = pkts[0] 823assert isotp.data == dhex("") 824assert (isotp.rx_id == 0x241) 825 826= Receive a two-frame ISOTP message 827 828with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans: 829 cans.pair(isocan) 830 cans.send(CAN(identifier = 0x241, data = dhex("10 0B 01 02 03 04 05 06"))) 831 cans.send(CAN(identifier = 0x241, data = dhex("21 07 08 09 10 11"))) 832 pkts = s.sniff(count=1, timeout=1) 833 if not len(pkts): 834 s.failure_analysis() 835 raise Scapy_Exception("ERROR") 836 isotp = pkts[0] 837 assert isotp.data == dhex("01 02 03 04 05 06 07 08 09 10 11") 838 839= Check what happens when a CAN frame with wrong identifier gets received 840 841with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans: 842 cans.pair(isocan) 843 cans.send(CAN(identifier = 0x141, data = dhex("05 01 02 03 04 05"))) 844 assert s.ins.rx_queue.empty() 845 846+ Testing ISOTPSoftSocket timeouts 847 848= Check if not sending the last CF will make the socket timeout 849with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans: 850 cans.pair(isocan) 851 cans.send(CAN(identifier = 0x241, data = dhex("10 11 01 02 03 04 05 06"))) 852 cans.send(CAN(identifier = 0x241, data = dhex("21 07 08 09 0A 0B 0C 0D"))) 853 isotp = s.sniff(timeout=0.1) 854 855assert len(isotp) == 0 856 857= Check if not sending the first CF will make the socket timeout 858 859with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans: 860 cans.pair(isocan) 861 cans.send(CAN(identifier = 0x241, data = dhex("10 11 01 02 03 04 05 06"))) 862 isotp = s.sniff(timeout=0.1) 863 864assert len(isotp) == 0 865 866= Check if not sending the first FC will make the socket timeout 867 868# drain log_stream 869log_stream.getvalue() 870 871isotp = ISOTP(data=dhex("01 02 03 04 05 06 07 08 09 0A")) 872 873with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s, TestSocket(CAN) as cans: 874 cans.pair(isocan) 875 s.send(isotp) 876 time.sleep(1.3) 877 878assert "TX state was reset due to timeout" in log_stream.getvalue() 879 880= Check if not sending the second FC will make the socket timeout 881 882# drain log_stream 883log_stream.getvalue() 884 885isotp = ISOTP(data=b"\xa5" * 120) 886cans = TestSocket(CAN) 887isocan = TestSocket(CAN) 888cans.pair(isocan) 889 890acker = AsyncSniffer(store=False, opened_socket=cans, 891 prn=lambda x: cans.send(CAN(identifier = 0x241, data=dhex("30 04 00"))), 892 count=1, timeout=1) 893acker.start() 894with ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s: 895 s.send(isotp) 896 time.sleep(1.3) 897 898acker.join(timeout=5) 899cans.close() 900isocan.close() 901 902assert "TX state was reset due to timeout" in log_stream.getvalue() 903 904= Check if reception of an overflow FC will make a send fail 905 906log_stream.getvalue() 907isotp = ISOTP(data=b"\xa5" * 120) 908cans = TestSocket(CAN) 909isocan = TestSocket(CAN) 910cans.pair(isocan) 911 912acker = AsyncSniffer(store=False, opened_socket=cans, 913 prn=lambda x: cans.send( 914 CAN(identifier = 0x241, data=dhex("32 00 00"))), 915 count=1, timeout=1) 916acker.start() 917 918with ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s: 919 s.send(isotp) 920 time.sleep(1.3) 921 922acker.join(timeout=5) 923cans.close() 924isocan.close() 925 926assert "Overflow happened at the receiver side" in log_stream.getvalue() 927 928+ More complex operations 929 930= ISOTPSoftSocket sr1 931msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33') 932 933with TestSocket(CAN) as isocan_tx, ISOTPSoftSocket(isocan_tx, 0x123, 0x321) as sock_tx, \ 934 TestSocket(CAN) as isocan_rx, ISOTPSoftSocket(isocan_rx, 0x321, 0x123) as sock_rx: 935 isocan_rx.pair(isocan_tx) 936 sniffer = AsyncSniffer(opened_socket=sock_rx, timeout=1, count=1, prn=lambda x: sock_rx.send(msg)) 937 sniffer.start() 938 rx2 = sock_tx.sr1(msg, timeout=3, verbose=True) 939 sniffer.join(timeout=1) 940 rx = sniffer.results[0] 941 942assert rx == msg 943assert rx2 is not None 944assert rx2 == msg 945 946= ISOTPSoftSocket sniff 947 948msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33') 949with TestSocket(CAN) as isocan1, ISOTPSoftSocket(isocan1, 0x123, 0x321) as sock, \ 950 TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, 0x321, 0x123) as rx_sock: 951 isocan1.pair(isocan) 952 msg.data += b'0' 953 sock.send(msg) 954 msg.data += b'1' 955 sock.send(msg) 956 msg.data += b'2' 957 sock.send(msg) 958 msg.data += b'3' 959 sock.send(msg) 960 msg.data += b'4' 961 sock.send(msg) 962 rx = rx_sock.sniff(count=5, timeout=5) 963 964msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33') 965msg.data += b'0' 966assert rx[0] == msg 967msg.data += b'1' 968assert rx[1] == msg 969msg.data += b'2' 970assert rx[2] == msg 971msg.data += b'3' 972assert rx[3] == msg 973msg.data += b'4' 974assert rx[4] == msg 975 976+ ISOTPSoftSocket MITM attack tests 977 978= bridge and sniff with isotp soft sockets set up vcan0 and vcan1 for package forwarding vcan1 979succ = False 980 981with TestSocket(CAN) as can0_0, \ 982 TestSocket(CAN) as can0_1, \ 983 TestSocket(CAN) as can1_0, \ 984 TestSocket(CAN) as can1_1, \ 985 ISOTPSoftSocket(can0_0, tx_id=0x241, rx_id=0x641) as isoTpSocket0, \ 986 ISOTPSoftSocket(can1_0, tx_id=0x541, rx_id=0x141) as isoTpSocket1, \ 987 ISOTPSoftSocket(can0_1, tx_id=0x641, rx_id=0x241) as bSocket0, \ 988 ISOTPSoftSocket(can1_1, tx_id=0x141, rx_id=0x141) as bSocket1: 989 can0_0.pair(can0_1) 990 can1_1.pair(can1_0) 991 evt = threading.Event() 992 def forwarding(pkt): 993 global forwarded 994 forwarded += 1 995 return pkt 996 def bridge(): 997 global forwarded, succ 998 forwarded = 0 999 bridge_and_sniff(if1=bSocket0, if2=bSocket1, xfrm12=forwarding, xfrm21=forwarding, timeout=1.5, 1000 started_callback=evt.set, count=1) 1001 succ = True 1002 threadBridge = threading.Thread(target=bridge) 1003 threadBridge.start() 1004 evt.wait(timeout=5) 1005 packetsVCan1 = isoTpSocket1.sniff(timeout=1.5, count=1, started_callback=lambda: isoTpSocket0.send(ISOTP(b'Request'))) 1006 threadBridge.join(timeout=5) 1007 assert not threadBridge.is_alive() 1008 1009assert forwarded == 1 1010assert len(packetsVCan1) == 1 1011assert succ 1012 1013= bridge and sniff with isotp soft sockets and multiple long packets 1014 1015N = 3 1016T = 3 1017 1018succ = False 1019with TestSocket(CAN) as can0_0, \ 1020 TestSocket(CAN) as can0_1, \ 1021 TestSocket(CAN) as can1_0, \ 1022 TestSocket(CAN) as can1_1, \ 1023 ISOTPSoftSocket(can0_0, tx_id=0x241, rx_id=0x641) as isoTpSocket0, \ 1024 ISOTPSoftSocket(can1_0, tx_id=0x541, rx_id=0x141) as isoTpSocket1, \ 1025 ISOTPSoftSocket(can0_1, tx_id=0x641, rx_id=0x241) as bSocket0, \ 1026 ISOTPSoftSocket(can1_1, tx_id=0x141, rx_id=0x541) as bSocket1: 1027 can0_0.pair(can0_1) 1028 can1_1.pair(can1_0) 1029 evt = threading.Event() 1030 def forwarding(pkt): 1031 global forwarded 1032 forwarded += 1 1033 return pkt 1034 def bridge(): 1035 global forwarded, succ 1036 forwarded = 0 1037 bridge_and_sniff(if1=bSocket0, if2=bSocket1, xfrm12=forwarding, xfrm21=forwarding, 1038 timeout=T, count=N, started_callback=evt.set) 1039 succ = True 1040 threadBridge = threading.Thread(target=bridge) 1041 threadBridge.start() 1042 evt.wait(timeout=5) 1043 for _ in range(N): 1044 isoTpSocket0.send(ISOTP(b'RequestASDF1234567890')) 1045 packetsVCan1 = isoTpSocket1.sniff(timeout=T, count=N) 1046 threadBridge.join(timeout=5) 1047 1048assert not threadBridge.is_alive() 1049 1050assert forwarded == N 1051assert len(packetsVCan1) == N 1052assert succ 1053 1054= bridge and sniff with isotp soft sockets set up vcan0 and vcan1 for package change vcan1 1055 1056succ = False 1057with TestSocket(CAN) as can0_0, \ 1058 TestSocket(CAN) as can0_1, \ 1059 TestSocket(CAN) as can1_0, \ 1060 TestSocket(CAN) as can1_1, \ 1061 ISOTPSoftSocket(can0_0, tx_id=0x241, rx_id=0x641) as isoTpSocket0, \ 1062 ISOTPSoftSocket(can1_0, tx_id=0x641, rx_id=0x241) as isoTpSocket1, \ 1063 ISOTPSoftSocket(can0_1, tx_id=0x641, rx_id=0x241) as bSocket0, \ 1064 ISOTPSoftSocket(can1_1, tx_id=0x241, rx_id=0x641) as bSocket1: 1065 can0_0.pair(can0_1) 1066 can1_1.pair(can1_0) 1067 evt = threading.Event() 1068 def forwarding(pkt): 1069 pkt.data = 'changed' 1070 return pkt 1071 def bridge(): 1072 global succ 1073 bridge_and_sniff(if1=bSocket0, if2=bSocket1, xfrm12=forwarding, xfrm21=forwarding, timeout=3, 1074 started_callback=evt.set, count=1) 1075 succ = True 1076 threadBridge = threading.Thread(target=bridge) 1077 threadBridge.start() 1078 evt.wait(timeout=5) 1079 packetsVCan1 = isoTpSocket1.sniff(timeout=2, count=1, started_callback=lambda: isoTpSocket0.send(ISOTP(b'Request'))) 1080 threadBridge.join(timeout=5) 1081 assert not threadBridge.is_alive() 1082 1083assert len(packetsVCan1) == 1 1084assert packetsVCan1[0].data == b'changed' 1085assert succ 1086 1087= Two ISOTPSoftSockets at the same time, sending and receiving 1088 1089with TestSocket(CAN) as cs1, ISOTPSoftSocket(cs1, tx_id=0x641, rx_id=0x241) as s1, \ 1090 TestSocket(CAN) as cs2, ISOTPSoftSocket(cs2, tx_id=0x241, rx_id=0x641) as s2: 1091 cs1.pair(cs2) 1092 isotp = ISOTP(data=b"\x10\x25" * 43) 1093 s2.send(isotp) 1094 result = s1.sniff(count=1, timeout=5) 1095 1096assert len(result) == 1 1097assert result[0].data == isotp.data 1098 1099 1100= Two ISOTPSoftSockets at the same time, sending and receiving with tx_gap 1101 1102with TestSocket(CAN) as cs1, ISOTPSoftSocket(cs1, tx_id=0x641, rx_id=0x241, stmin=1) as s1, \ 1103 TestSocket(CAN) as cs2, ISOTPSoftSocket(cs2, tx_id=0x241, rx_id=0x641) as s2: 1104 cs1.pair(cs2) 1105 isotp = ISOTP(data=b"\x10\x25" * 43) 1106 s2.send(isotp) 1107 result = s1.sniff(count=1, timeout=5) 1108 1109assert len(result) == 1 1110assert result[0].data == isotp.data 1111 1112 1113= Two ISOTPSoftSockets at the same time, multiple sends/receives 1114def test(): 1115 with TestSocket(CAN) as cs1, ISOTPSoftSocket(cs1, tx_id=0x641, rx_id=0x241) as s1, \ 1116 TestSocket(CAN) as cs2, ISOTPSoftSocket(cs2, tx_id=0x241, rx_id=0x641) as s2: 1117 cs1.pair(cs2) 1118 for i in range(1, 40, 5): 1119 isotp = ISOTP(data=bytearray(range(i, i * 2))) 1120 s2.send(isotp) 1121 result = s1.sniff(count=8, timeout=5) 1122 print(result) 1123 for p in result: 1124 print(repr(p)) 1125 assert len(result) == 8 1126 1127test() 1128 1129= Send a single frame ISOTP message with padding 1130 1131with TestSocket(CAN) as cs1, ISOTPSoftSocket(cs1, tx_id=0x641, rx_id=0x241, padding=True) as s: 1132 with TestSocket(CAN) as cans: 1133 cs1.pair(cans) 1134 s.send(ISOTP(data=dhex("01"))) 1135 pkts = cans.sniff(timeout=1, count=1) 1136 if not len(pkts): 1137 s.failure_analysis() 1138 raise Scapy_Exception("ERROR") 1139 res = pkts[0] 1140 assert res.length == 8 1141 1142= Send a single frame ISOTP message with padding FD 1143 1144with TestSocket(CANFD) as cs1, ISOTPSoftSocket(cs1, tx_id=0x641, rx_id=0x241, padding=True, fd=True) as s: 1145 with TestSocket(CANFD) as cans: 1146 cs1.pair(cans) 1147 pl_sizes_testings = [1, 5, 7, 8, 9, 12, 15, 17, 20, 21, 27, 35, 40, 46, 50, 62] 1148 pl_sizes_expected = [8, 8, 8, 12, 12, 16, 20, 20, 24, 24, 32, 48, 48, 48, 64, 64] 1149 for i, pl_size in enumerate(pl_sizes_testings): 1150 s.send(dhex(" ".join(["%02X" % x for x in range(pl_size)]))) 1151 pkts = cans.sniff(timeout=1, count=1) 1152 if not len(pkts): 1153 s.failure_analysis() 1154 raise Scapy_Exception("ERROR") 1155 res = pkts[0] 1156 assert res.length == pl_sizes_expected[i] 1157 1158 1159= Send a two-frame ISOTP message with padding 1160 1161acks = TestSocket(CAN) 1162cans = TestSocket(CAN) 1163acks.pair(cans) 1164 1165def send_ack(x): 1166 acks.send(CAN(identifier = 0x241, data=dhex("30 00 00"))) 1167 1168acker = AsyncSniffer(opened_socket=acks, store=False, prn=send_ack, timeout=1, count=1) 1169acker.start() 1170 1171with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, padding=True) as s: 1172 acks.pair(isocan) 1173 cans.pair(isocan) 1174 s.send(ISOTP(data=dhex("01 02 03 04 05 06 07 08"))) 1175 canpks = cans.sniff(timeout=1, count=3) 1176 1177acker.join(timeout=5) 1178canpks.sort(key=lambda x:x.identifier) 1179assert canpks[1].identifier == 0x641 1180assert canpks[1].data == dhex("10 08 01 02 03 04 05 06") 1181assert canpks[0].identifier == 0x241 1182assert canpks[0].data == dhex("30 00 00") 1183assert canpks[2].identifier == 0x641 1184assert canpks[2].data == dhex("21 07 08 CC CC CC CC CC") 1185 1186 1187= Receive a padded single frame ISOTP message with padding disabled 1188with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, padding=False) as s: 1189 with TestSocket(CAN) as cans: 1190 cans.pair(isocan) 1191 cans.send(CAN(identifier=0x241, data=dhex("02 05 06 00 00 00 00 00"))) 1192 pkts = s.sniff(count=1, timeout=1) 1193 if not len(pkts): 1194 s.failure_analysis() 1195 raise Scapy_Exception("ERROR") 1196 res = pkts[0] 1197 assert res.data == dhex("05 06") 1198 1199 1200= Receive a padded single frame ISOTP message with padding enabled 1201with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, padding=True) as s: 1202 with TestSocket(CAN) as cans: 1203 cans.pair(isocan) 1204 cans.send(CAN(identifier=0x241, data=dhex("02 05 06 00 00 00 00 00"))) 1205 pkts = s.sniff(count=1, timeout=1) 1206 if not len(pkts): 1207 s.failure_analysis() 1208 raise Scapy_Exception("ERROR") 1209 res = pkts[0] 1210 assert res.data == dhex("05 06") 1211 1212 1213= Receive a non-padded single frame ISOTP message with padding enabled 1214with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, padding=True) as s: 1215 with TestSocket(CAN) as cans: 1216 cans.pair(isocan) 1217 cans.send(CAN(identifier=0x241, data=dhex("02 05 06"))) 1218 pkts = s.sniff(count=1, timeout=2) 1219 if not len(pkts): 1220 s.failure_analysis() 1221 raise Scapy_Exception("ERROR") 1222 res = pkts[0] 1223 assert res.data == dhex("05 06") 1224 1225 1226= Receive a padded two-frame ISOTP message with padding enabled 1227with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, padding=True) as s: 1228 with TestSocket(CAN) as cans: 1229 cans.pair(isocan) 1230 cans.send(CAN(identifier=0x241, data=dhex("10 09 01 02 03 04 05 06"))) 1231 cans.send(CAN(identifier=0x241, data=dhex("21 07 08 09 00 00 00 00"))) 1232 pkts = s.sniff(count=1, timeout=1) 1233 if not len(pkts): 1234 s.failure_analysis() 1235 raise Scapy_Exception("ERROR") 1236 res = pkts[0] 1237 assert res.data == dhex("01 02 03 04 05 06 07 08 09") 1238 1239= Receive a padded two-frame ISOTP message with padding disabled 1240with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241, padding=False) as s: 1241 with TestSocket(CAN) as cans: 1242 cans.pair(isocan) 1243 cans.send(CAN(identifier=0x241, data=dhex("10 09 01 02 03 04 05 06"))) 1244 cans.send(CAN(identifier=0x241, data=dhex("21 07 08 09 00 00 00 00"))) 1245 pkts = s.sniff(count=1, timeout=1) 1246 if not len(pkts): 1247 s.failure_analysis() 1248 raise Scapy_Exception("ERROR") 1249 res = pkts[0] 1250 res.show() 1251 assert res.data == dhex("01 02 03 04 05 06 07 08 09") 1252 1253+ Cleanup 1254 1255= Delete testsockets 1256 1257cleanup_testsockets() 1258 1259TimeoutScheduler.clear() 1260 1261log_runtime.removeHandler(handler) 1262