1#!/usr/bin/env python3 2# 3# Copyright (c) 2016, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import unittest 31 32import command 33import config 34import mle 35import thread_cert 36from command import CheckType 37from pktverify.consts import MLE_PARENT_REQUEST, MLE_CHILD_ID_REQUEST, ADDR_SOL_URI, MODE_TLV, TIMEOUT_TLV, CHALLENGE_TLV, RESPONSE_TLV, LINK_LAYER_FRAME_COUNTER_TLV, ADDRESS16_TLV, NETWORK_DATA_TLV, TLV_REQUEST_TLV, SCAN_MASK_TLV, VERSION_TLV, ADDRESS_REGISTRATION_TLV, NL_MAC_EXTENDED_ADDRESS_TLV, NL_RLOC16_TLV, NL_STATUS_TLV, NL_ROUTER_MASK_TLV, COAP_CODE_POST, COAP_CODE_ACK 38from pktverify.packet_verifier import PacketVerifier 39from pktverify.null_field import nullField 40 41LEADER = 1 42ROUTER1 = 2 43 44# Test Purpose and Description: 45# ----------------------------- 46# The purpose of this test case is to verify that when the Leader 47# de-allocates a router ID, the DUT, as a router, re-attaches. 48# 49# Test Topology: 50# ------------- 51# Leader 52# | 53# Router 54# 55# DUT Types: 56# ---------- 57# Router 58 59 60class Cert_5_1_06_RemoveRouterId(thread_cert.TestCase): 61 USE_MESSAGE_FACTORY = False 62 63 TOPOLOGY = { 64 LEADER: { 65 'name': 'LEADER', 66 'mode': 'rdn', 67 'allowlist': [ROUTER1] 68 }, 69 ROUTER1: { 70 'name': 'ROUTER', 71 'mode': 'rdn', 72 'allowlist': [LEADER] 73 }, 74 } 75 76 def test(self): 77 self.nodes[LEADER].start() 78 self.simulator.go(config.LEADER_STARTUP_DELAY) 79 self.assertEqual(self.nodes[LEADER].get_state(), 'leader') 80 81 self.nodes[ROUTER1].start() 82 self.simulator.go(config.ROUTER_STARTUP_DELAY) 83 self.assertEqual(self.nodes[ROUTER1].get_state(), 'router') 84 rloc16 = self.nodes[ROUTER1].get_addr16() 85 self.collect_rloc16s() 86 87 router_lladdr = self.nodes[ROUTER1].get_ip6_address(config.ADDRESS_TYPE.LINK_LOCAL) 88 self.assertTrue(self.nodes[LEADER].ping(router_lladdr)) 89 90 self.nodes[LEADER].release_router_id(rloc16 >> 10) 91 self.simulator.go(5) 92 self.assertEqual(self.nodes[ROUTER1].get_state(), 'router') 93 94 self.assertTrue(self.nodes[LEADER].ping(router_lladdr)) 95 96 def verify(self, pv): 97 pkts = pv.pkts 98 pv.summary.show() 99 100 LEADER = pv.vars['LEADER'] 101 LEADER_RLOC16 = pv.vars['LEADER_RLOC16'] 102 ROUTER = pv.vars['ROUTER'] 103 MM = pv.vars['MM_PORT'] 104 105 # Step 1: Verify topology is formed correctly. 106 107 pv.verify_attached('ROUTER') 108 _pkt_as = pkts.filter_wpan_src64(LEADER).\ 109 filter_coap_ack(ADDR_SOL_URI, port=MM).\ 110 filter(lambda p: { 111 NL_STATUS_TLV, 112 NL_RLOC16_TLV, 113 NL_ROUTER_MASK_TLV 114 } <= set(p.coap.tlv.type) and\ 115 p.coap.code == COAP_CODE_ACK and\ 116 p.thread_address.tlv.status == 0 117 ).\ 118 must_next() 119 120 # Step 3: Router send a properly formatted 121 # MLE Parent Request, 122 # MLE Child ID Request, 123 # and Address Solicit Request 124 # messages to the Leader. 125 126 pkts.filter_wpan_src64(ROUTER).\ 127 filter_LLARMA().\ 128 filter_mle_cmd(MLE_PARENT_REQUEST).\ 129 filter(lambda p: { 130 CHALLENGE_TLV, 131 MODE_TLV, 132 SCAN_MASK_TLV, 133 VERSION_TLV 134 } <= set(p.mle.tlv.type) and\ 135 p.ipv6.hlim == 255 and\ 136 p.mle.tlv.scan_mask.r == 1 and\ 137 p.mle.tlv.scan_mask.e == 0).\ 138 must_next() 139 140 _pkt = pkts.filter_wpan_src64(ROUTER).\ 141 filter_wpan_dst64(LEADER).\ 142 filter_mle_cmd(MLE_CHILD_ID_REQUEST).\ 143 filter(lambda p: { 144 LINK_LAYER_FRAME_COUNTER_TLV, 145 MODE_TLV, 146 RESPONSE_TLV, 147 TIMEOUT_TLV, 148 TLV_REQUEST_TLV, 149 ADDRESS16_TLV, 150 NETWORK_DATA_TLV, 151 VERSION_TLV 152 } <= set(p.mle.tlv.type) and\ 153 p.mle.tlv.addr16 is nullField and\ 154 p.thread_nwd.tlv.type is nullField).\ 155 must_next() 156 _pkt.must_not_verify(lambda p: (ADDRESS_REGISTRATION_TLV) in p.mle.tlv.type) 157 158 pkts.filter_wpan_src64(ROUTER).\ 159 filter_wpan_dst16(LEADER_RLOC16).\ 160 filter_coap_request(ADDR_SOL_URI, port=MM).\ 161 filter(lambda p: { 162 NL_MAC_EXTENDED_ADDRESS_TLV, 163 NL_RLOC16_TLV, 164 NL_STATUS_TLV 165 } <= set(p.coap.tlv.type) and\ 166 p.coap.code == COAP_CODE_POST and\ 167 p.thread_address.tlv.rloc16 == 168 _pkt_as.thread_address.tlv.rloc16 169 ).\ 170 must_next() 171 172 pkts.filter_wpan_src64(LEADER).\ 173 filter_coap_ack(ADDR_SOL_URI, port=MM).\ 174 filter(lambda p: { 175 NL_STATUS_TLV, 176 NL_RLOC16_TLV, 177 NL_ROUTER_MASK_TLV 178 } <= set(p.coap.tlv.type) and\ 179 p.coap.code == COAP_CODE_ACK and\ 180 p.thread_address.tlv.rloc16 != 181 _pkt_as.thread_address.tlv.rloc16 and\ 182 p.thread_address.tlv.status == 0 183 ).\ 184 must_next() 185 186 # Step 4: DUT responds with ICMPv6 Echo Reply 187 188 _pkt = pkts.filter_ping_request().\ 189 filter_wpan_src64(LEADER).\ 190 filter_wpan_dst64(ROUTER).\ 191 must_next() 192 pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 193 filter_wpan_src64(ROUTER).\ 194 filter_wpan_dst64(LEADER).\ 195 must_next() 196 197 198if __name__ == '__main__': 199 unittest.main() 200