• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1% Regression tests for the XCP_CAN
2~ scanner
3
4+ Basic operations
5
6= Imports
7
8from test.testsocket import TestSocket, cleanup_testsockets
9
10+ Tests XCPonCAN Scanner
11
12=  modules
13
14load_contrib("automotive.xcp.xcp", globals_dict=globals())
15load_contrib("automotive.xcp.scanner", globals_dict=globals())
16
17
18= xcp can scanner broadcast ID-Range
19
20id_range = range(50, 53)
21slave_id_1 = 10
22response_id_1 = 11
23slave_id_2 = 20
24response_id_2 = 21
25
26slave_1_response = XCPOnCAN(identifier=response_id_1) / CTOResponse(packet_code=0xFF) / TransportLayerCmdGetSlaveIdResponse(can_identifier=slave_id_1)
27slave_2_response = XCPOnCAN(identifier=response_id_2) / CTOResponse(packet_code=0xFF) / TransportLayerCmdGetSlaveIdResponse(can_identifier=slave_id_2)
28
29random_xcp_response_1 = XCPOnCAN(identifier=30) / CTOResponse(packet_code=0xFF) / GenericResponse(b"\x00\x00")
30random_xcp_response_2 = XCPOnCAN(identifier=40) / CTOResponse(packet_code=0xFF) / GenericResponse(b"\x00\x00")
31
32sock1 = TestSocket(XCPOnCAN)
33sock2 = TestSocket(XCPOnCAN)
34sock1.pair(sock2)
35
36def ecu():
37    for i in range(50, 53):
38        sock1.sniff(count=1, store=False, timeout=2)
39        if i == 50:
40            sock1.send(CAN(identifier=0x90, data=b'\x01\x02\x03'))
41            sock1.send(CAN(identifier=0x90, data=b'\x05\x02\x03'))
42            sock1.send(CAN(identifier=0x90, data=b'\xff\x05\x03'))
43        if i == 51:
44            sock1.send(random_xcp_response_1)
45            sock1.send(random_xcp_response_2)
46        if i == 52:
47            sock1.send(slave_1_response)
48            sock1.send(slave_2_response)
49
50
51thread = threading.Thread(target=ecu)
52thread.start()
53
54scanner = XCPOnCANScanner(sock2, id_range=id_range, sniff_time=0.5)
55result = scanner.scan_with_get_slave_id()
56thread.join(timeout=3)
57sock1.close()
58sock2.close()
59assert len(result) == 2
60assert result[0].request_id == slave_id_1
61assert result[0].response_id == response_id_1
62assert result[1].request_id == slave_id_2
63assert result[1].response_id == response_id_2
64
65
66= xcp can scanner connect ID-range
67id_range = range(50, 53)
68slave_id = 52
69response_id = 11
70
71connect_response = XCPOnCAN(identifier=response_id) / CTOResponse(packet_code=0xFF) / ConnectPositiveResponse()
72
73random_xcp_response_1 = XCPOnCAN(identifier=30) / CTOResponse(packet_code=0xFF) / GenericResponse(b"\x00\x00")
74random_xcp_response_2 = XCPOnCAN(identifier=40) / CTOResponse(packet_code=0xFF) / GenericResponse(b"\x10")
75
76sock1 = TestSocket(XCPOnCAN)
77sock2 = TestSocket(XCPOnCAN)
78sock1.pair(sock2)
79
80
81def ecu():
82    for i in range(50, 53):
83        sock1.sniff(count=1, store=False, timeout=2)
84        if i == 50:
85            sock1.send(CAN(identifier=0x90, data=b'\x01\x02\x03'))
86            sock1.send(CAN(identifier=0x90, data=b'\xff\x05\x03'))
87        if i == 51:
88            sock1.send(CAN(identifier=0x90, data=b'\x05\x02\x03'))
89            sock1.send(random_xcp_response_1)
90            sock1.send(random_xcp_response_2)
91        if i == slave_id:
92            sock1.send(CAN(identifier=0x90, data=b'\xff\x05\x03'))
93            sock1.send(connect_response)
94
95
96thread = threading.Thread(target=ecu)
97thread.start()
98
99scanner = XCPOnCANScanner(sock2, id_range=id_range, sniff_time=0.5)
100result = scanner.scan_with_connect()
101thread.join(timeout=3)
102sock1.close()
103sock2.close()
104
105assert len(result) == 1
106assert result[0].request_id == slave_id
107assert result[0].response_id == response_id
108
109
110+ Cleanup
111
112= Delete TestSockets
113
114cleanup_testsockets()
115