• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#  Copyright (c) 2016, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import unittest
31
32import command
33import config
34import thread_cert
35from pktverify.packet_verifier import PacketVerifier
36
37LEADER = 1
38DUT_ROUTER1 = 2
39ROUTER2 = 3
40ROUTER3 = 4
41
42# Test Purpose and Description:
43# -----------------------------
44# The purpose of this test case is to ensure that the DUT routes traffic properly
45# when link qualities between the nodes are adjusted.
46#
47# Test Topology:
48# -------------
49#            Leader
50#            /    \
51#     Router_2 - Router_1(DUT)
52#                   |
53#                Router_3
54#
55# DUT Types:
56# ----------
57#  Router
58
59
60class Cert_5_3_5_RoutingLinkQuality(thread_cert.TestCase):
61    USE_MESSAGE_FACTORY = False
62
63    TOPOLOGY = {
64        LEADER: {
65            'name': 'LEADER',
66            'mode': 'rdn',
67            'allowlist': [DUT_ROUTER1, ROUTER2]
68        },
69        DUT_ROUTER1: {
70            'name': 'ROUTER_1',
71            'mode': 'rdn',
72            'allowlist': [LEADER, ROUTER2, ROUTER3]
73        },
74        ROUTER2: {
75            'name': 'ROUTER_2',
76            'mode': 'rdn',
77            'allowlist': [LEADER, DUT_ROUTER1]
78        },
79        ROUTER3: {
80            'name': 'ROUTER_3',
81            'mode': 'rdn',
82            'allowlist': [DUT_ROUTER1]
83        },
84    }
85
86    def test(self):
87        # 1
88        self.nodes[LEADER].start()
89        self.simulator.go(config.LEADER_STARTUP_DELAY)
90        self.assertEqual(self.nodes[LEADER].get_state(), 'leader')
91
92        for router in range(DUT_ROUTER1, ROUTER3 + 1):
93            self.nodes[router].start()
94        self.simulator.go(config.ROUTER_STARTUP_DELAY)
95
96        for router in range(DUT_ROUTER1, ROUTER3 + 1):
97            self.assertEqual(self.nodes[router].get_state(), 'router')
98
99        self.collect_rlocs()
100        self.collect_rloc16s()
101
102        # 2 & 3
103        leader_rloc = self.nodes[LEADER].get_ip6_address(config.ADDRESS_TYPE.RLOC)
104        self.assertTrue(self.nodes[ROUTER3].ping(leader_rloc))
105
106        # 4 & 5
107        self.nodes[LEADER].add_allowlist(self.nodes[DUT_ROUTER1].get_addr64(), config.RSSI['LINK_QULITY_1'])
108        self.nodes[DUT_ROUTER1].add_allowlist(self.nodes[LEADER].get_addr64(), config.RSSI['LINK_QULITY_1'])
109        self.simulator.go(3 * config.MAX_ADVERTISEMENT_INTERVAL)
110
111        self.assertTrue(self.nodes[ROUTER3].ping(leader_rloc))
112
113        # 6 & 7
114        self.nodes[LEADER].add_allowlist(self.nodes[DUT_ROUTER1].get_addr64(), config.RSSI['LINK_QULITY_2'])
115        self.nodes[DUT_ROUTER1].add_allowlist(self.nodes[LEADER].get_addr64(), config.RSSI['LINK_QULITY_2'])
116        self.simulator.go(3 * config.MAX_ADVERTISEMENT_INTERVAL)
117
118        self.assertTrue(self.nodes[ROUTER3].ping(leader_rloc))
119
120        # 8 & 9
121        self.nodes[LEADER].add_allowlist(self.nodes[DUT_ROUTER1].get_addr64(), config.RSSI['LINK_QULITY_0'])
122        self.nodes[DUT_ROUTER1].add_allowlist(self.nodes[LEADER].get_addr64(), config.RSSI['LINK_QULITY_0'])
123        self.simulator.go(3 * config.MAX_ADVERTISEMENT_INTERVAL)
124
125        self.assertTrue(self.nodes[ROUTER3].ping(leader_rloc))
126
127    def verify(self, pv):
128        pkts = pv.pkts
129        pv.summary.show()
130
131        LEADER = pv.vars['LEADER']
132        LEADER_RLOC = pv.vars['LEADER_RLOC']
133        LEADER_RLOC16 = pv.vars['LEADER_RLOC16']
134        ROUTER_1 = pv.vars['ROUTER_1']
135        ROUTER_1_RLOC = pv.vars['ROUTER_1_RLOC']
136        ROUTER_1_RLOC16 = pv.vars['ROUTER_1_RLOC16']
137        ROUTER_2 = pv.vars['ROUTER_2']
138        ROUTER_2_RLOC16 = pv.vars['ROUTER_2_RLOC16']
139        ROUTER_2_RLOC = pv.vars['ROUTER_2_RLOC']
140        ROUTER_3 = pv.vars['ROUTER_3']
141        ROUTER_3_RLOC = pv.vars['ROUTER_3_RLOC']
142        MM = pv.vars['MM_PORT']
143
144        # Step 1: Ensure topology is formed correctly
145        for i in range(1, 4):
146            with pkts.save_index():
147                pv.verify_attached('ROUTER_%d' % i)
148
149        # Step 2: Modify the link quality between the DUT and the Leader to be 3
150        # Step 3: Router_3 sends an ICMPv6 Echo Request to the Leader
151        #         The ICMPv6 Echo Request MUST take the shortest path:
152        #             Router_3 -> DUT -> Leader
153        #         The hopsLft field of the 6LoWPAN Mesh Header MUST be greater than
154        #         the route cost to the destination
155
156        _pkt = pkts.filter_ping_request().\
157            filter_wpan_src64(ROUTER_3).\
158            filter_ipv6_dst(LEADER_RLOC).\
159            must_next()
160        _pkt.must_verify(lambda p: p.lowpan.mesh.hops > 2)
161        pkts.filter_ping_request(identifier=_pkt.icmpv6.echo.identifier).\
162            filter_wpan_src64(ROUTER_1).\
163            filter_wpan_dst16(LEADER_RLOC16).\
164            must_next()
165        pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\
166            filter_wpan_src64(LEADER).\
167            filter_ipv6_dst(ROUTER_3_RLOC).\
168            must_next()
169
170        # Step 4: Modify the link quality between the DUT and the Leader to be 1
171        # Step 5: Router_3 sends an ICMPv6 Echo Request to the Leader
172        #         The ICMPv6 Echo Request MUST take the shortest path:
173        #             Router_3 -> DUT -> Router_2 -> Leader
174        #         The hopsLft field of the 6LoWPAN Mesh Header MUST be greater than
175        #         the route cost to the destination
176
177        _pkt = pkts.filter_ping_request().\
178            filter_wpan_src64(ROUTER_3).\
179            filter_ipv6_dst(LEADER_RLOC).\
180            must_next()
181        _pkt.must_verify(lambda p: p.lowpan.mesh.hops > 3)
182        pkts.filter_ping_request(identifier=_pkt.icmpv6.echo.identifier).\
183            filter_wpan_src64(ROUTER_1).\
184            filter_wpan_dst16(ROUTER_2_RLOC16).\
185            must_next()
186        pkts.filter_ping_request(identifier=_pkt.icmpv6.echo.identifier).\
187            filter_wpan_src64(ROUTER_2).\
188            filter_wpan_dst16(LEADER_RLOC16).\
189            must_next()
190        pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\
191            filter_wpan_src64(LEADER).\
192            filter_ipv6_dst(ROUTER_3_RLOC).\
193            must_next()
194
195        # Step 6: Modify the link quality between the DUT and the Leader to be 2
196        # Step 7: Router_3 sends an ICMPv6 Echo Request to the Leader
197        #         The ICMPv6 Echo Request MUST take the shortest path:
198        #             Router_3 -> DUT -> Leader
199        #         The hopsLft field of the 6LoWPAN Mesh Header MUST be greater than
200        #         the route cost to the destination
201
202        _pkt = pkts.filter_ping_request().\
203            filter_wpan_src64(ROUTER_3).\
204            filter_ipv6_dst(LEADER_RLOC).\
205            must_next()
206        _pkt.must_verify(lambda p: p.lowpan.mesh.hops > 2)
207        pkts.filter_ping_request(identifier=_pkt.icmpv6.echo.identifier).\
208            filter_wpan_src64(ROUTER_1).\
209            filter_wpan_dst16(LEADER_RLOC16).\
210            must_next()
211        pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\
212            filter_wpan_src64(LEADER).\
213            filter_ipv6_dst(ROUTER_3_RLOC).\
214            must_next()
215
216        # Step 8: Modify the link quality between the DUT and the Leader to be 0
217        # Step 9: Router_3 sends an ICMPv6 Echo Request to the Leader
218        #         The ICMPv6 Echo Request MUST take the shortest path:
219        #             Router_3 -> DUT -> Router_2 -> Leader
220        #         The hopsLft field of the 6LoWPAN Mesh Header MUST be greater than
221        #         the route cost to the destination
222
223        _pkt = pkts.filter_ping_request().\
224            filter_wpan_src64(ROUTER_3).\
225            filter_ipv6_dst(LEADER_RLOC).\
226            must_next()
227        _pkt.must_verify(lambda p: p.lowpan.mesh.hops > 3)
228        pkts.filter_ping_request(identifier=_pkt.icmpv6.echo.identifier).\
229            filter_wpan_src64(ROUTER_1).\
230            filter_wpan_dst16(ROUTER_2_RLOC16).\
231            must_next()
232        pkts.filter_ping_request(identifier=_pkt.icmpv6.echo.identifier).\
233            filter_wpan_src64(ROUTER_2).\
234            filter_wpan_dst16(LEADER_RLOC16).\
235            must_next()
236        pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\
237            filter_wpan_src64(LEADER).\
238            filter_ipv6_dst(ROUTER_3_RLOC).\
239            must_next()
240
241
242if __name__ == '__main__':
243    unittest.main()
244