1#!/usr/bin/env python3 2# 3# Copyright (c) 2016, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import unittest 31 32import command 33import config 34import thread_cert 35from pktverify.packet_verifier import PacketVerifier 36 37LEADER = 1 38DUT_ROUTER1 = 2 39ROUTER2 = 3 40ROUTER3 = 4 41 42# Test Purpose and Description: 43# ----------------------------- 44# The purpose of this test case is to ensure that the DUT routes traffic properly 45# when link qualities between the nodes are adjusted. 46# 47# Test Topology: 48# ------------- 49# Leader 50# / \ 51# Router_2 - Router_1(DUT) 52# | 53# Router_3 54# 55# DUT Types: 56# ---------- 57# Router 58 59 60class Cert_5_3_5_RoutingLinkQuality(thread_cert.TestCase): 61 USE_MESSAGE_FACTORY = False 62 63 TOPOLOGY = { 64 LEADER: { 65 'name': 'LEADER', 66 'mode': 'rdn', 67 'allowlist': [DUT_ROUTER1, ROUTER2] 68 }, 69 DUT_ROUTER1: { 70 'name': 'ROUTER_1', 71 'mode': 'rdn', 72 'allowlist': [LEADER, ROUTER2, ROUTER3] 73 }, 74 ROUTER2: { 75 'name': 'ROUTER_2', 76 'mode': 'rdn', 77 'allowlist': [LEADER, DUT_ROUTER1] 78 }, 79 ROUTER3: { 80 'name': 'ROUTER_3', 81 'mode': 'rdn', 82 'allowlist': [DUT_ROUTER1] 83 }, 84 } 85 86 def test(self): 87 # 1 88 self.nodes[LEADER].start() 89 self.simulator.go(config.LEADER_STARTUP_DELAY) 90 self.assertEqual(self.nodes[LEADER].get_state(), 'leader') 91 92 for router in range(DUT_ROUTER1, ROUTER3 + 1): 93 self.nodes[router].start() 94 self.simulator.go(config.ROUTER_STARTUP_DELAY) 95 96 for router in range(DUT_ROUTER1, ROUTER3 + 1): 97 self.assertEqual(self.nodes[router].get_state(), 'router') 98 99 self.collect_rlocs() 100 self.collect_rloc16s() 101 102 # 2 & 3 103 leader_rloc = self.nodes[LEADER].get_ip6_address(config.ADDRESS_TYPE.RLOC) 104 self.assertTrue(self.nodes[ROUTER3].ping(leader_rloc)) 105 106 # 4 & 5 107 self.nodes[LEADER].add_allowlist(self.nodes[DUT_ROUTER1].get_addr64(), config.RSSI['LINK_QULITY_1']) 108 self.nodes[DUT_ROUTER1].add_allowlist(self.nodes[LEADER].get_addr64(), config.RSSI['LINK_QULITY_1']) 109 self.simulator.go(3 * config.MAX_ADVERTISEMENT_INTERVAL) 110 111 self.assertTrue(self.nodes[ROUTER3].ping(leader_rloc)) 112 113 # 6 & 7 114 self.nodes[LEADER].add_allowlist(self.nodes[DUT_ROUTER1].get_addr64(), config.RSSI['LINK_QULITY_2']) 115 self.nodes[DUT_ROUTER1].add_allowlist(self.nodes[LEADER].get_addr64(), config.RSSI['LINK_QULITY_2']) 116 self.simulator.go(3 * config.MAX_ADVERTISEMENT_INTERVAL) 117 118 self.assertTrue(self.nodes[ROUTER3].ping(leader_rloc)) 119 120 # 8 & 9 121 self.nodes[LEADER].add_allowlist(self.nodes[DUT_ROUTER1].get_addr64(), config.RSSI['LINK_QULITY_0']) 122 self.nodes[DUT_ROUTER1].add_allowlist(self.nodes[LEADER].get_addr64(), config.RSSI['LINK_QULITY_0']) 123 self.simulator.go(3 * config.MAX_ADVERTISEMENT_INTERVAL) 124 125 self.assertTrue(self.nodes[ROUTER3].ping(leader_rloc)) 126 127 def verify(self, pv): 128 pkts = pv.pkts 129 pv.summary.show() 130 131 LEADER = pv.vars['LEADER'] 132 LEADER_RLOC = pv.vars['LEADER_RLOC'] 133 LEADER_RLOC16 = pv.vars['LEADER_RLOC16'] 134 ROUTER_1 = pv.vars['ROUTER_1'] 135 ROUTER_1_RLOC = pv.vars['ROUTER_1_RLOC'] 136 ROUTER_1_RLOC16 = pv.vars['ROUTER_1_RLOC16'] 137 ROUTER_2 = pv.vars['ROUTER_2'] 138 ROUTER_2_RLOC16 = pv.vars['ROUTER_2_RLOC16'] 139 ROUTER_2_RLOC = pv.vars['ROUTER_2_RLOC'] 140 ROUTER_3 = pv.vars['ROUTER_3'] 141 ROUTER_3_RLOC = pv.vars['ROUTER_3_RLOC'] 142 MM = pv.vars['MM_PORT'] 143 144 # Step 1: Ensure topology is formed correctly 145 for i in range(1, 4): 146 with pkts.save_index(): 147 pv.verify_attached('ROUTER_%d' % i) 148 149 # Step 2: Modify the link quality between the DUT and the Leader to be 3 150 # Step 3: Router_3 sends an ICMPv6 Echo Request to the Leader 151 # The ICMPv6 Echo Request MUST take the shortest path: 152 # Router_3 -> DUT -> Leader 153 # The hopsLft field of the 6LoWPAN Mesh Header MUST be greater than 154 # the route cost to the destination 155 156 _pkt = pkts.filter_ping_request().\ 157 filter_wpan_src64(ROUTER_3).\ 158 filter_ipv6_dst(LEADER_RLOC).\ 159 must_next() 160 _pkt.must_verify(lambda p: p.lowpan.mesh.hops > 2) 161 pkts.filter_ping_request(identifier=_pkt.icmpv6.echo.identifier).\ 162 filter_wpan_src64(ROUTER_1).\ 163 filter_wpan_dst16(LEADER_RLOC16).\ 164 must_next() 165 pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 166 filter_wpan_src64(LEADER).\ 167 filter_ipv6_dst(ROUTER_3_RLOC).\ 168 must_next() 169 170 # Step 4: Modify the link quality between the DUT and the Leader to be 1 171 # Step 5: Router_3 sends an ICMPv6 Echo Request to the Leader 172 # The ICMPv6 Echo Request MUST take the shortest path: 173 # Router_3 -> DUT -> Router_2 -> Leader 174 # The hopsLft field of the 6LoWPAN Mesh Header MUST be greater than 175 # the route cost to the destination 176 177 _pkt = pkts.filter_ping_request().\ 178 filter_wpan_src64(ROUTER_3).\ 179 filter_ipv6_dst(LEADER_RLOC).\ 180 must_next() 181 _pkt.must_verify(lambda p: p.lowpan.mesh.hops > 3) 182 pkts.filter_ping_request(identifier=_pkt.icmpv6.echo.identifier).\ 183 filter_wpan_src64(ROUTER_1).\ 184 filter_wpan_dst16(ROUTER_2_RLOC16).\ 185 must_next() 186 pkts.filter_ping_request(identifier=_pkt.icmpv6.echo.identifier).\ 187 filter_wpan_src64(ROUTER_2).\ 188 filter_wpan_dst16(LEADER_RLOC16).\ 189 must_next() 190 pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 191 filter_wpan_src64(LEADER).\ 192 filter_ipv6_dst(ROUTER_3_RLOC).\ 193 must_next() 194 195 # Step 6: Modify the link quality between the DUT and the Leader to be 2 196 # Step 7: Router_3 sends an ICMPv6 Echo Request to the Leader 197 # The ICMPv6 Echo Request MUST take the shortest path: 198 # Router_3 -> DUT -> Leader 199 # The hopsLft field of the 6LoWPAN Mesh Header MUST be greater than 200 # the route cost to the destination 201 202 _pkt = pkts.filter_ping_request().\ 203 filter_wpan_src64(ROUTER_3).\ 204 filter_ipv6_dst(LEADER_RLOC).\ 205 must_next() 206 _pkt.must_verify(lambda p: p.lowpan.mesh.hops > 2) 207 pkts.filter_ping_request(identifier=_pkt.icmpv6.echo.identifier).\ 208 filter_wpan_src64(ROUTER_1).\ 209 filter_wpan_dst16(LEADER_RLOC16).\ 210 must_next() 211 pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 212 filter_wpan_src64(LEADER).\ 213 filter_ipv6_dst(ROUTER_3_RLOC).\ 214 must_next() 215 216 # Step 8: Modify the link quality between the DUT and the Leader to be 0 217 # Step 9: Router_3 sends an ICMPv6 Echo Request to the Leader 218 # The ICMPv6 Echo Request MUST take the shortest path: 219 # Router_3 -> DUT -> Router_2 -> Leader 220 # The hopsLft field of the 6LoWPAN Mesh Header MUST be greater than 221 # the route cost to the destination 222 223 _pkt = pkts.filter_ping_request().\ 224 filter_wpan_src64(ROUTER_3).\ 225 filter_ipv6_dst(LEADER_RLOC).\ 226 must_next() 227 _pkt.must_verify(lambda p: p.lowpan.mesh.hops > 3) 228 pkts.filter_ping_request(identifier=_pkt.icmpv6.echo.identifier).\ 229 filter_wpan_src64(ROUTER_1).\ 230 filter_wpan_dst16(ROUTER_2_RLOC16).\ 231 must_next() 232 pkts.filter_ping_request(identifier=_pkt.icmpv6.echo.identifier).\ 233 filter_wpan_src64(ROUTER_2).\ 234 filter_wpan_dst16(LEADER_RLOC16).\ 235 must_next() 236 pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 237 filter_wpan_src64(LEADER).\ 238 filter_ipv6_dst(ROUTER_3_RLOC).\ 239 must_next() 240 241 242if __name__ == '__main__': 243 unittest.main() 244