1% Regression tests for the XCP_CAN 2~ scanner 3 4+ Basic operations 5 6= Imports 7 8from test.testsocket import TestSocket, cleanup_testsockets 9 10+ Tests XCPonCAN Scanner 11 12= modules 13 14load_contrib("automotive.xcp.xcp", globals_dict=globals()) 15load_contrib("automotive.xcp.scanner", globals_dict=globals()) 16 17 18= xcp can scanner broadcast ID-Range 19 20id_range = range(50, 53) 21slave_id_1 = 10 22response_id_1 = 11 23slave_id_2 = 20 24response_id_2 = 21 25 26slave_1_response = XCPOnCAN(identifier=response_id_1) / CTOResponse(packet_code=0xFF) / TransportLayerCmdGetSlaveIdResponse(can_identifier=slave_id_1) 27slave_2_response = XCPOnCAN(identifier=response_id_2) / CTOResponse(packet_code=0xFF) / TransportLayerCmdGetSlaveIdResponse(can_identifier=slave_id_2) 28 29random_xcp_response_1 = XCPOnCAN(identifier=30) / CTOResponse(packet_code=0xFF) / GenericResponse(b"\x00\x00") 30random_xcp_response_2 = XCPOnCAN(identifier=40) / CTOResponse(packet_code=0xFF) / GenericResponse(b"\x00\x00") 31 32sock1 = TestSocket(XCPOnCAN) 33sock2 = TestSocket(XCPOnCAN) 34sock1.pair(sock2) 35 36def ecu(): 37 for i in range(50, 53): 38 sock1.sniff(count=1, store=False, timeout=2) 39 if i == 50: 40 sock1.send(CAN(identifier=0x90, data=b'\x01\x02\x03')) 41 sock1.send(CAN(identifier=0x90, data=b'\x05\x02\x03')) 42 sock1.send(CAN(identifier=0x90, data=b'\xff\x05\x03')) 43 if i == 51: 44 sock1.send(random_xcp_response_1) 45 sock1.send(random_xcp_response_2) 46 if i == 52: 47 sock1.send(slave_1_response) 48 sock1.send(slave_2_response) 49 50 51thread = threading.Thread(target=ecu) 52thread.start() 53 54scanner = XCPOnCANScanner(sock2, id_range=id_range, sniff_time=0.5) 55result = scanner.scan_with_get_slave_id() 56thread.join(timeout=3) 57sock1.close() 58sock2.close() 59assert len(result) == 2 60assert result[0].request_id == slave_id_1 61assert result[0].response_id == response_id_1 62assert result[1].request_id == slave_id_2 63assert result[1].response_id == response_id_2 64 65 66= xcp can scanner connect ID-range 67id_range = range(50, 53) 68slave_id = 52 69response_id = 11 70 71connect_response = XCPOnCAN(identifier=response_id) / CTOResponse(packet_code=0xFF) / ConnectPositiveResponse() 72 73random_xcp_response_1 = XCPOnCAN(identifier=30) / CTOResponse(packet_code=0xFF) / GenericResponse(b"\x00\x00") 74random_xcp_response_2 = XCPOnCAN(identifier=40) / CTOResponse(packet_code=0xFF) / GenericResponse(b"\x10") 75 76sock1 = TestSocket(XCPOnCAN) 77sock2 = TestSocket(XCPOnCAN) 78sock1.pair(sock2) 79 80 81def ecu(): 82 for i in range(50, 53): 83 sock1.sniff(count=1, store=False, timeout=2) 84 if i == 50: 85 sock1.send(CAN(identifier=0x90, data=b'\x01\x02\x03')) 86 sock1.send(CAN(identifier=0x90, data=b'\xff\x05\x03')) 87 if i == 51: 88 sock1.send(CAN(identifier=0x90, data=b'\x05\x02\x03')) 89 sock1.send(random_xcp_response_1) 90 sock1.send(random_xcp_response_2) 91 if i == slave_id: 92 sock1.send(CAN(identifier=0x90, data=b'\xff\x05\x03')) 93 sock1.send(connect_response) 94 95 96thread = threading.Thread(target=ecu) 97thread.start() 98 99scanner = XCPOnCANScanner(sock2, id_range=id_range, sniff_time=0.5) 100result = scanner.scan_with_connect() 101thread.join(timeout=3) 102sock1.close() 103sock2.close() 104 105assert len(result) == 1 106assert result[0].request_id == slave_id 107assert result[0].response_id == response_id 108 109 110+ Cleanup 111 112= Delete TestSockets 113 114cleanup_testsockets() 115