1#!/usr/bin/env python3 2# 3# Copyright (c) 2016, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import unittest 31 32import command 33import config 34import thread_cert 35from pktverify.consts import MLE_CHILD_ID_REQUEST, MLE_CHILD_ID_RESPONSE, ADDR_QRY_URI, ADDR_NTF_URI, NL_ML_EID_TLV, NL_RLOC16_TLV, NL_TARGET_EID_TLV, COAP_CODE_POST 36from pktverify.packet_verifier import PacketVerifier 37 38LEADER = 1 39ROUTER1 = 2 40DUT_ROUTER2 = 3 41ROUTER3 = 4 42MED1 = 5 43MED1_TIMEOUT = 3 44 45# Test Purpose and Description: 46# ----------------------------- 47# The purpose of this test case is to validate that the DUT is able to generate 48# Address Query messages and properly respond with Address Notification messages. 49# 50# Test Topology: 51# ------------- 52# Router_1 - Leader 53# / \ 54# Router_3 - Router_2(DUT) 55# | 56# MED 57# 58# DUT Types: 59# ---------- 60# Router 61 62 63class Cert_5_3_3_AddressQuery(thread_cert.TestCase): 64 USE_MESSAGE_FACTORY = False 65 66 TOPOLOGY = { 67 LEADER: { 68 'name': 'LEADER', 69 'mode': 'rdn', 70 'allowlist': [ROUTER1, DUT_ROUTER2, ROUTER3] 71 }, 72 ROUTER1: { 73 'name': 'ROUTER_1', 74 'mode': 'rdn', 75 'allowlist': [LEADER] 76 }, 77 DUT_ROUTER2: { 78 'name': 'ROUTER_2', 79 'mode': 'rdn', 80 'allowlist': [LEADER, ROUTER3, MED1] 81 }, 82 ROUTER3: { 83 'name': 'ROUTER_3', 84 'mode': 'rdn', 85 'allowlist': [LEADER, DUT_ROUTER2] 86 }, 87 MED1: { 88 'name': 'MED', 89 'is_mtd': True, 90 'mode': 'rn', 91 'timeout': 3, 92 'allowlist': [DUT_ROUTER2] 93 }, 94 } 95 96 def test(self): 97 # 1 98 self.nodes[LEADER].start() 99 self.simulator.go(config.LEADER_STARTUP_DELAY) 100 self.assertEqual(self.nodes[LEADER].get_state(), 'leader') 101 102 self.nodes[ROUTER1].start() 103 self.nodes[DUT_ROUTER2].start() 104 self.nodes[ROUTER3].start() 105 self.nodes[MED1].start() 106 self.simulator.go(config.ROUTER_STARTUP_DELAY) 107 108 self.assertEqual(self.nodes[ROUTER1].get_state(), 'router') 109 self.assertEqual(self.nodes[DUT_ROUTER2].get_state(), 'router') 110 self.assertEqual(self.nodes[ROUTER3].get_state(), 'router') 111 self.assertEqual(self.nodes[MED1].get_state(), 'child') 112 self.collect_ipaddrs() 113 self.collect_rlocs() 114 self.collect_rloc16s() 115 116 # 2 117 router3_mleid = self.nodes[ROUTER3].get_ip6_address(config.ADDRESS_TYPE.ML_EID) 118 self.assertTrue(self.nodes[MED1].ping(router3_mleid)) 119 120 # 3 121 # Wait the finish of address resolution traffic triggerred by previous 122 # ping. 123 self.simulator.go(5) 124 125 med1_mleid = self.nodes[MED1].get_ip6_address(config.ADDRESS_TYPE.ML_EID) 126 self.assertTrue(self.nodes[ROUTER1].ping(med1_mleid)) 127 128 # 4 129 # Wait the finish of address resolution traffic triggerred by previous 130 # ping. 131 self.simulator.go(5) 132 133 self.assertTrue(self.nodes[MED1].ping(router3_mleid)) 134 135 # 5 136 # Power off ROUTER3 and wait for leader to expire its Router ID. 137 # In this topology, ROUTER3 has two neighbors (Leader and DUT_ROUTER2), 138 # so the wait time is (MAX_NEIGHBOR_AGE (100s) + worst propagation time (32s * 15) for bad routing +\ 139 # INFINITE_COST_TIMEOUT (90s) + transmission time + extra redundancy), 140 # totally ~700s. 141 self.nodes[ROUTER3].stop() 142 self.simulator.go(700) 143 144 self.assertFalse(self.nodes[MED1].ping(router3_mleid)) 145 146 # 6 147 self.nodes[MED1].stop() 148 self.simulator.go(MED1_TIMEOUT) 149 150 self.assertFalse(self.nodes[ROUTER1].ping(med1_mleid)) 151 self.assertFalse(self.nodes[ROUTER1].ping(med1_mleid)) 152 153 def verify(self, pv): 154 pkts = pv.pkts 155 pv.summary.show() 156 157 LEADER = pv.vars['LEADER'] 158 LEADER_MLEID = pv.vars['LEADER_MLEID'] 159 ROUTER_1 = pv.vars['ROUTER_1'] 160 ROUTER_1_RLOC = pv.vars['ROUTER_1_RLOC'] 161 ROUTER_1_MLEID = pv.vars['ROUTER_1_MLEID'] 162 ROUTER_2 = pv.vars['ROUTER_2'] 163 ROUTER_2_RLOC16 = pv.vars['ROUTER_2_RLOC16'] 164 ROUTER_2_RLOC = pv.vars['ROUTER_2_RLOC'] 165 ROUTER_2_MLEID = pv.vars['ROUTER_2_MLEID'] 166 MED = pv.vars['MED'] 167 MED_RLOC16 = pv.vars['MED_RLOC16'] 168 MED_MLEID = pv.vars['MED_MLEID'] 169 ROUTER_3 = pv.vars['ROUTER_3'] 170 ROUTER_3_MLEID = pv.vars['ROUTER_3_MLEID'] 171 MM = pv.vars['MM_PORT'] 172 173 # Step 1: Build the topology as described 174 for i in range(1, 4): 175 with pkts.save_index(): 176 pv.verify_attached('ROUTER_%d' % i) 177 pkts.filter_wpan_src64(MED).\ 178 filter_wpan_dst64(ROUTER_2).\ 179 filter_mle_cmd(MLE_CHILD_ID_REQUEST).\ 180 must_next() 181 pkts.filter_wpan_src64(ROUTER_2).\ 182 filter_wpan_dst64(MED).\ 183 filter_mle_cmd(MLE_CHILD_ID_RESPONSE).\ 184 must_next() 185 186 # Step 2: MED sends an ICMPv6 Echo Request to Router_3 ML-EID 187 # The DUT MUST generate an Address Query Request on MED’s behalf 188 # to find Router_3 address. 189 # The Address Query Request MUST be sent to the Realm-Local 190 # All-Routers address (FF03::2) 191 # CoAP URI-Path 192 # - NON POST coap://<FF03::2> 193 # CoAP Payload 194 # - Target EID TLV 195 # The DUT MUST receive and process the incoming Address Notification 196 # The DUT MUST then forward the ICMPv6 Echo Request from MED and 197 # forward the ICMPv6 Echo Reply to MED 198 199 _pkt = pkts.filter_ping_request().\ 200 filter_wpan_src64(MED).\ 201 filter_ipv6_dst(ROUTER_3_MLEID).\ 202 must_next() 203 pkts.filter_wpan_src64(ROUTER_2).\ 204 filter_RLARMA().\ 205 filter_coap_request(ADDR_QRY_URI, port=MM).\ 206 filter(lambda p: p.thread_address.tlv.target_eid == ROUTER_3_MLEID).\ 207 must_next() 208 _pkt1 = pkts.filter_wpan_src64(ROUTER_3).\ 209 filter_ipv6_dst(ROUTER_2_RLOC).\ 210 filter_coap_request(ADDR_NTF_URI, port=MM).\ 211 filter(lambda p: { 212 NL_ML_EID_TLV, 213 NL_RLOC16_TLV, 214 NL_TARGET_EID_TLV 215 } == set(p.coap.tlv.type) and\ 216 p.thread_address.tlv.target_eid == ROUTER_3_MLEID and\ 217 p.coap.code == COAP_CODE_POST 218 ).\ 219 must_next() 220 pkts.filter_ping_request(identifier=_pkt.icmpv6.echo.identifier).\ 221 filter_wpan_src64(ROUTER_2).\ 222 filter_ipv6_dst(ROUTER_3_MLEID).\ 223 must_next() 224 pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 225 filter_wpan_src64(ROUTER_3).\ 226 filter_dst16(ROUTER_2_RLOC16).\ 227 must_next() 228 pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 229 filter_wpan_src64(ROUTER_2).\ 230 filter_wpan_dst16(MED_RLOC16).\ 231 must_next() 232 233 # Step 3: Router_1 sends an ICMPv6 Echo Request to the MED ML-EID address 234 # The DUT MUST respond to the Address Query Request with a properly 235 # formatted Address Notification Message: 236 # CoAP URI-Path 237 # - CON POST coap://[<Address Query Source>]:MM/a/an 238 # CoAP Payload 239 # - ML-EID TLV 240 # - RLOC16 TLV 241 # - Target EID TLV 242 # The IPv6 Source address MUST be the RLOC of the originator 243 # The IPv6 Destination address MUST be the RLOC of the destination 244 245 pkts.filter_wpan_src64(ROUTER_1).\ 246 filter_RLARMA().\ 247 filter_coap_request(ADDR_QRY_URI, port=MM).\ 248 filter(lambda p: p.thread_address.tlv.target_eid == MED_MLEID).\ 249 must_next() 250 pkts.filter_ipv6_src_dst(ROUTER_2_RLOC, ROUTER_1_RLOC).\ 251 filter_coap_request(ADDR_NTF_URI, port=MM).\ 252 filter(lambda p: { 253 NL_ML_EID_TLV, 254 NL_RLOC16_TLV, 255 NL_TARGET_EID_TLV 256 } <= set(p.coap.tlv.type) and\ 257 p.thread_address.tlv.target_eid == MED_MLEID and\ 258 p.coap.code == COAP_CODE_POST 259 ).\ 260 must_next() 261 _pkt = pkts.filter_ping_request().\ 262 filter_wpan_src64(ROUTER_1).\ 263 filter_ipv6_dst(MED_MLEID).\ 264 must_next() 265 pkts.filter_ping_request(identifier=_pkt.icmpv6.echo.identifier).\ 266 filter_wpan_src64(ROUTER_2).\ 267 filter_ipv6_dst(MED_MLEID).\ 268 must_next() 269 pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 270 filter_wpan_src64(MED).\ 271 filter_wpan_dst16(ROUTER_2_RLOC16).\ 272 must_next() 273 pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 274 filter_wpan_src64(ROUTER_2).\ 275 filter_ipv6_dst(ROUTER_1_MLEID).\ 276 must_next() 277 278 # Step 4: MED sends an ICMPv6 Echo Request to the Router_3 ML-EID 279 # The DUT MUST NOT send an Address Query, as the Router_3 address 280 # should be cached 281 # The DUT MUST forward the ICMPv6 Echo Reply to MED 282 283 _pkt = pkts.filter_ping_request().\ 284 filter_wpan_src64(MED).\ 285 filter_ipv6_dst(ROUTER_3_MLEID).\ 286 must_next() 287 lstart = pkts.index 288 pkts.filter_ping_request(identifier=_pkt.icmpv6.echo.identifier).\ 289 filter_wpan_src64(ROUTER_2).\ 290 filter_ipv6_dst(ROUTER_3_MLEID).\ 291 must_next() 292 lend = pkts.index 293 294 pkts.range(lstart, lend).\ 295 filter_wpan_src64(ROUTER_2).\ 296 filter_RLARMA().\ 297 filter_coap_request(ADDR_QRY_URI).\ 298 must_not_next() 299 300 pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 301 filter_wpan_src64(ROUTER_3).\ 302 filter_wpan_dst16(ROUTER_2_RLOC16).\ 303 must_next() 304 _pkt1 = pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 305 filter_wpan_src64(ROUTER_2).\ 306 filter_wpan_dst16(MED_RLOC16).\ 307 must_next() 308 309 # Step 5: Power off Router_3 and wait for the Leader to expire its Router ID 310 # Send an ICMPv6 Echo Request from MED to the Router_3 ML-EID address 311 # The DUT MUST update its address cache and removes all entries based 312 # on Router_3’s Router ID. 313 # The DUT MUST send an Address Query to discover Router_3’s RLOC 314 # address. 315 316 _pkt = pkts.filter_ping_request().\ 317 filter_wpan_src64(MED).\ 318 filter_ipv6_dst(ROUTER_3_MLEID).\ 319 filter(lambda p: p.sniff_timestamp - _pkt1.sniff_timestamp >= 700).\ 320 must_next() 321 pkts.filter_wpan_src64(ROUTER_2).\ 322 filter_RLARMA().\ 323 filter_coap_request(ADDR_QRY_URI, port=MM).\ 324 filter(lambda p: p.thread_address.tlv.target_eid == ROUTER_3_MLEID and\ 325 p.coap.code == COAP_CODE_POST 326 ).\ 327 must_next() 328 329 # Step 6: Power off MED and wait for the DUT to timeout the child. 330 # Send two ICMPv6 Echo Requests from Router_1 to MED ML-EID 331 # The DUT MUST NOT respond with an Address Notification message 332 333 pkts.filter_ping_request().\ 334 filter_wpan_src64(ROUTER_1).\ 335 filter_ipv6_dst(MED_MLEID).\ 336 must_next() 337 338 pkts.filter_wpan_src64(ROUTER_1).\ 339 filter_RLARMA().\ 340 filter_coap_request(ADDR_QRY_URI, port=MM).\ 341 filter(lambda p: p.thread_address.tlv.target_eid == MED_MLEID and\ 342 p.coap.code == COAP_CODE_POST 343 ).\ 344 must_next() 345 346 pkts.filter_ipv6_src_dst(ROUTER_2_RLOC, ROUTER_1_RLOC).\ 347 filter_coap_request(ADDR_NTF_URI).\ 348 must_not_next() 349 350 351if __name__ == '__main__': 352 unittest.main() 353