% Regression tests for the XCP_CAN ~ scanner + Basic operations = Imports from test.testsocket import TestSocket, cleanup_testsockets + Tests XCPonCAN Scanner = modules load_contrib("automotive.xcp.xcp", globals_dict=globals()) load_contrib("automotive.xcp.scanner", globals_dict=globals()) = xcp can scanner broadcast ID-Range id_range = range(50, 53) slave_id_1 = 10 response_id_1 = 11 slave_id_2 = 20 response_id_2 = 21 slave_1_response = XCPOnCAN(identifier=response_id_1) / CTOResponse(packet_code=0xFF) / TransportLayerCmdGetSlaveIdResponse(can_identifier=slave_id_1) slave_2_response = XCPOnCAN(identifier=response_id_2) / CTOResponse(packet_code=0xFF) / TransportLayerCmdGetSlaveIdResponse(can_identifier=slave_id_2) random_xcp_response_1 = XCPOnCAN(identifier=30) / CTOResponse(packet_code=0xFF) / GenericResponse(b"\x00\x00") random_xcp_response_2 = XCPOnCAN(identifier=40) / CTOResponse(packet_code=0xFF) / GenericResponse(b"\x00\x00") sock1 = TestSocket(XCPOnCAN) sock2 = TestSocket(XCPOnCAN) sock1.pair(sock2) def ecu(): for i in range(50, 53): sock1.sniff(count=1, store=False, timeout=2) if i == 50: sock1.send(CAN(identifier=0x90, data=b'\x01\x02\x03')) sock1.send(CAN(identifier=0x90, data=b'\x05\x02\x03')) sock1.send(CAN(identifier=0x90, data=b'\xff\x05\x03')) if i == 51: sock1.send(random_xcp_response_1) sock1.send(random_xcp_response_2) if i == 52: sock1.send(slave_1_response) sock1.send(slave_2_response) thread = threading.Thread(target=ecu) thread.start() scanner = XCPOnCANScanner(sock2, id_range=id_range, sniff_time=0.5) result = scanner.scan_with_get_slave_id() thread.join(timeout=3) sock1.close() sock2.close() assert len(result) == 2 assert result[0].request_id == slave_id_1 assert result[0].response_id == response_id_1 assert result[1].request_id == slave_id_2 assert result[1].response_id == response_id_2 = xcp can scanner connect ID-range id_range = range(50, 53) slave_id = 52 response_id = 11 connect_response = XCPOnCAN(identifier=response_id) / CTOResponse(packet_code=0xFF) / ConnectPositiveResponse() random_xcp_response_1 = XCPOnCAN(identifier=30) / CTOResponse(packet_code=0xFF) / GenericResponse(b"\x00\x00") random_xcp_response_2 = XCPOnCAN(identifier=40) / CTOResponse(packet_code=0xFF) / GenericResponse(b"\x10") sock1 = TestSocket(XCPOnCAN) sock2 = TestSocket(XCPOnCAN) sock1.pair(sock2) def ecu(): for i in range(50, 53): sock1.sniff(count=1, store=False, timeout=2) if i == 50: sock1.send(CAN(identifier=0x90, data=b'\x01\x02\x03')) sock1.send(CAN(identifier=0x90, data=b'\xff\x05\x03')) if i == 51: sock1.send(CAN(identifier=0x90, data=b'\x05\x02\x03')) sock1.send(random_xcp_response_1) sock1.send(random_xcp_response_2) if i == slave_id: sock1.send(CAN(identifier=0x90, data=b'\xff\x05\x03')) sock1.send(connect_response) thread = threading.Thread(target=ecu) thread.start() scanner = XCPOnCANScanner(sock2, id_range=id_range, sniff_time=0.5) result = scanner.scan_with_connect() thread.join(timeout=3) sock1.close() sock2.close() assert len(result) == 1 assert result[0].request_id == slave_id assert result[0].response_id == response_id + Cleanup = Delete TestSockets cleanup_testsockets()