1#!/usr/bin/env python3 2# 3# Copyright (c) 2018, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import unittest 31 32import command 33import config 34import thread_cert 35from pktverify.consts import ADDR_QRY_URI, ADDR_NTF_URI, NL_ML_EID_TLV, NL_RLOC16_TLV, NL_TARGET_EID_TLV 36from pktverify.packet_verifier import PacketVerifier 37from pktverify.bytes import Bytes 38from pktverify.addrs import Ipv6Addr 39 40LEADER = 1 41DUT_ROUTER1 = 2 42MED1 = 3 43X = 'fd00:db8:0000:0000:aa55:aa55:aa55:aa55' 44 45# Test Purpose and Description: 46# ----------------------------- 47# The purpose of this test case is to validate the way AQ_TIMEOUT and 48# AQ_RETRY_TIMEOUT intervals are used in the Address Query transmission 49# algorithm. 50# 51# Test Topology: 52# ------------- 53# Leader 54# | 55# Router(DUT) 56# | 57# MED 58# 59# DUT Types: 60# ---------- 61# Router 62 63 64class Cert_5_3_11_AddressQueryTimeoutIntervals(thread_cert.TestCase): 65 USE_MESSAGE_FACTORY = False 66 67 TOPOLOGY = { 68 LEADER: { 69 'name': 'LEADER', 70 'mode': 'rdn', 71 'allowlist': [DUT_ROUTER1] 72 }, 73 DUT_ROUTER1: { 74 'name': 'ROUTER', 75 'mode': 'rdn', 76 'allowlist': [LEADER, MED1] 77 }, 78 MED1: { 79 'name': 'MED', 80 'is_mtd': True, 81 'mode': 'rn', 82 'allowlist': [DUT_ROUTER1] 83 }, 84 } 85 86 def test(self): 87 self.nodes[LEADER].start() 88 self.simulator.go(config.LEADER_STARTUP_DELAY) 89 self.assertEqual(self.nodes[LEADER].get_state(), 'leader') 90 91 self.nodes[DUT_ROUTER1].start() 92 self.simulator.go(config.ROUTER_STARTUP_DELAY) 93 self.assertEqual(self.nodes[DUT_ROUTER1].get_state(), 'router') 94 95 self.nodes[MED1].start() 96 self.simulator.go(5) 97 self.assertEqual(self.nodes[MED1].get_state(), 'child') 98 99 self.collect_rlocs() 100 101 # 2 MED1: MED1 sends an ICMPv6 Echo Request to a non-existent 102 # mesh-local address X 103 self.assertFalse(self.nodes[MED1].ping(X)) 104 105 self.simulator.go(config.AQ_TIMEOUT) 106 107 # 3 MED1: MED1 sends an ICMPv6 Echo Request to a non-existent mesh-local address X before 108 # ADDRESS_QUERY_INITIAL_RETRY_DELAY timeout expires 109 self.assertFalse(self.nodes[MED1].ping(X)) 110 111 self.simulator.go(config.ADDRESS_QUERY_INITIAL_RETRY_DELAY) 112 113 # 4 MED1: MED1 sends an ICMPv6 Echo Request to a non-existent mesh-local address X after 114 # ADDRESS_QUERY_INITIAL_RETRY_DELAY timeout expired 115 self.assertFalse(self.nodes[MED1].ping(X)) 116 117 def verify(self, pv): 118 pkts = pv.pkts 119 pv.summary.show() 120 121 LEADER = pv.vars['LEADER'] 122 ROUTER = pv.vars['ROUTER'] 123 ROUTER_RLOC = pv.vars['ROUTER_RLOC'] 124 MED = pv.vars['MED'] 125 MM = pv.vars['MM_PORT'] 126 127 # Step 1: Build the topology as described 128 129 pv.verify_attached('ROUTER', 'LEADER') 130 pv.verify_attached('MED', 'ROUTER', 'MTD') 131 132 # Step 2: MED sends an ICMPv6 Echo Request to a nonexistent mesh-local 133 # address X 134 # The DUT MUST generate an Address Query Request on MED’s behalf 135 # The Address Query Request MUST be sent to the Realm-Local 136 # All-Routers address (FF03::2) 137 # CoAP URI-Path 138 # - NON POST coap://<FF03::2> 139 # CoAP Payload 140 # - Target EID TLV – non-existent mesh-local 141 # An Address Query Notification MUST NOT be received within 142 # AQ_TIMEOUT interval. 143 144 _pkt = pkts.filter_ping_request().\ 145 filter_wpan_src64(MED).\ 146 filter_ipv6_dst(Ipv6Addr(X)).\ 147 must_next() 148 step2_start = pkts.index 149 150 pkts.filter_wpan_src64(ROUTER).\ 151 filter_RLARMA().\ 152 filter_coap_request(ADDR_QRY_URI, port=MM).\ 153 filter(lambda p: p.thread_address.tlv.target_eid == Ipv6Addr(X)).\ 154 must_next() 155 156 # Step 3: MED sends an ICMPv6 Echo Request to a nonexistent mesh-local 157 # address X before ADDRESS_QUERY_INITIAL_RETRY_DELAY timeout 158 # expired 159 # The DUT MUST NOT initiate a new Address Query frame 160 161 pkts.filter_ping_request().\ 162 filter_wpan_src64(MED).\ 163 filter_ipv6_dst(Ipv6Addr(X)).\ 164 must_next() 165 step2_end = pkts.index 166 167 pkts.range(step2_start, step2_end).filter_ipv6_dst(ROUTER_RLOC).\ 168 filter_coap_request(ADDR_NTF_URI, port=MM).\ 169 must_not_next() 170 171 # Step 4: MED sends an ICMPv6 Echo Request to a nonexistent mesh-local 172 # address X after ADDRESS_QUERY_INITIAL_RETRY_DELAY timeout 173 # expired 174 # The DUT MUST generate an Address Query Request on MED’s behalf 175 # The Address Query Request MUST be sent to the Realm-Local 176 # All-Routers address (FF03::2) 177 # CoAP URI-Path 178 # - NON POST coap://<FF03::2> 179 # CoAP Payload 180 # - Target EID TLV – non-existent mesh-local 181 182 pkts.filter_ping_request().\ 183 filter_wpan_src64(MED).\ 184 filter_ipv6_dst(Ipv6Addr(X)).\ 185 must_next() 186 step3_end = pkts.index 187 pkts.range(step2_end, step3_end).filter_wpan_src64(ROUTER).\ 188 filter_RLARMA().\ 189 filter_coap_request(ADDR_QRY_URI, port=MM).\ 190 must_not_next() 191 pkts.filter_wpan_src64(ROUTER).\ 192 filter_RLARMA().\ 193 filter_coap_request(ADDR_QRY_URI, port=MM).\ 194 filter(lambda p: p.thread_address.tlv.target_eid == Ipv6Addr(X)).\ 195 must_next() 196 197 198if __name__ == '__main__': 199 unittest.main() 200