1#!/usr/bin/env python3 2# 3# Copyright (c) 2016, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import unittest 31 32import config 33import thread_cert 34from pktverify.consts import MLE_CHILD_ID_RESPONSE, MLE_DATA_RESPONSE, MGMT_PENDING_SET_URI, MGMT_ACTIVE_SET_URI, MGMT_DATASET_CHANGED_URI, COAP_CODE_ACK, ACTIVE_OPERATION_DATASET_TLV, ACTIVE_TIMESTAMP_TLV, PENDING_TIMESTAMP_TLV, NM_CHANNEL_TLV, NM_CHANNEL_MASK_TLV, NM_EXTENDED_PAN_ID_TLV, NM_NETWORK_KEY_TLV, NM_NETWORK_MESH_LOCAL_PREFIX_TLV, NM_NETWORK_NAME_TLV, NM_PAN_ID_TLV, NM_PSKC_TLV, NM_SECURITY_POLICY_TLV, SOURCE_ADDRESS_TLV, LEADER_DATA_TLV, ACTIVE_TIMESTAMP_TLV, NETWORK_DATA_TLV, NM_BORDER_AGENT_LOCATOR_TLV, NM_COMMISSIONER_SESSION_ID_TLV, NM_DELAY_TIMER_TLV, PENDING_OPERATION_DATASET_TLV 35from pktverify.packet_verifier import PacketVerifier 36 37PANID_INIT = 0xface 38 39COMMISSIONER = 1 40LEADER = 2 41ROUTER = 3 42 43LEADER_ACTIVE_TIMESTAMP = 10 44ROUTER_ACTIVE_TIMESTAMP = 20 45ROUTER_PENDING_TIMESTAMP = 30 46ROUTER_PENDING_ACTIVE_TIMESTAMP = 25 47ROUTER_DELAY_TIMER = 3600000 48 49COMMISSIONER_PENDING_TIMESTAMP = 40 50COMMISSIONER_PENDING_ACTIVE_TIMESTAMP = 80 51COMMISSIONER_DELAY_TIMER = 60000 52COMMISSIONER_PENDING_CHANNEL = 20 53COMMISSIONER_PENDING_PANID = 0xafce 54 55 56class Cert_9_2_7_DelayTimer(thread_cert.TestCase): 57 SUPPORT_NCP = False 58 59 TOPOLOGY = { 60 COMMISSIONER: { 61 'name': 'COMMISSIONER', 62 'mode': 'rdn', 63 'allowlist': [LEADER] 64 }, 65 LEADER: { 66 'name': 'LEADER', 67 'mode': 'rdn', 68 'partition_id': 0xffffffff, 69 'allowlist': [COMMISSIONER] 70 }, 71 ROUTER: { 72 'name': 'ROUTER', 73 'mode': 'rdn', 74 'partition_id': 1, 75 }, 76 } 77 78 def test(self): 79 self.nodes[LEADER].start() 80 self.simulator.go(config.LEADER_STARTUP_DELAY) 81 self.assertEqual(self.nodes[LEADER].get_state(), 'leader') 82 83 self.nodes[COMMISSIONER].start() 84 self.simulator.go(config.ROUTER_STARTUP_DELAY) 85 self.assertEqual(self.nodes[COMMISSIONER].get_state(), 'router') 86 self.nodes[COMMISSIONER].commissioner_start() 87 self.simulator.go(3) 88 89 self.nodes[COMMISSIONER].send_mgmt_active_set(active_timestamp=LEADER_ACTIVE_TIMESTAMP,) 90 self.simulator.go(5) 91 92 self.nodes[ROUTER].start() 93 self.simulator.go(config.LEADER_STARTUP_DELAY) 94 self.assertEqual(self.nodes[ROUTER].get_state(), 'leader') 95 96 self.nodes[LEADER].add_allowlist(self.nodes[ROUTER].get_addr64()) 97 self.nodes[ROUTER].add_allowlist(self.nodes[LEADER].get_addr64()) 98 self.simulator.go(35) 99 self.assertEqual(self.nodes[COMMISSIONER].get_state(), 'router') 100 self.assertEqual(self.nodes[LEADER].get_state(), 'leader') 101 self.assertEqual(self.nodes[ROUTER].get_state(), 'router') 102 103 self.nodes[ROUTER].commissioner_start() 104 self.simulator.go(3) 105 self.nodes[ROUTER].send_mgmt_active_set(active_timestamp=ROUTER_ACTIVE_TIMESTAMP,) 106 self.simulator.go(30) 107 108 self.nodes[ROUTER].send_mgmt_pending_set( 109 pending_timestamp=ROUTER_PENDING_TIMESTAMP, 110 active_timestamp=ROUTER_PENDING_ACTIVE_TIMESTAMP, 111 delay_timer=ROUTER_DELAY_TIMER, 112 ) 113 self.simulator.go(60) 114 115 self.nodes[COMMISSIONER].send_mgmt_pending_set( 116 pending_timestamp=COMMISSIONER_PENDING_TIMESTAMP, 117 active_timestamp=COMMISSIONER_PENDING_ACTIVE_TIMESTAMP, 118 delay_timer=COMMISSIONER_DELAY_TIMER, 119 channel=COMMISSIONER_PENDING_CHANNEL, 120 panid=COMMISSIONER_PENDING_PANID, 121 ) 122 self.simulator.go(120) 123 124 self.assertEqual(self.nodes[LEADER].get_panid(), COMMISSIONER_PENDING_PANID) 125 self.assertEqual(self.nodes[COMMISSIONER].get_panid(), COMMISSIONER_PENDING_PANID) 126 self.assertEqual(self.nodes[ROUTER].get_panid(), COMMISSIONER_PENDING_PANID) 127 128 self.assertEqual(self.nodes[LEADER].get_channel(), COMMISSIONER_PENDING_CHANNEL) 129 self.assertEqual( 130 self.nodes[COMMISSIONER].get_channel(), 131 COMMISSIONER_PENDING_CHANNEL, 132 ) 133 self.assertEqual(self.nodes[ROUTER].get_channel(), COMMISSIONER_PENDING_CHANNEL) 134 135 self.collect_rloc16s() 136 self.collect_rlocs() 137 ipaddrs = self.nodes[ROUTER].get_addrs() 138 for ipaddr in ipaddrs: 139 if ipaddr[0:4] != 'fe80': 140 break 141 self.assertTrue(self.nodes[LEADER].ping(ipaddr)) 142 143 def verify(self, pv): 144 pkts = pv.pkts 145 pv.summary.show() 146 147 LEADER = pv.vars['LEADER'] 148 LEADER_RLOC16 = pv.vars['LEADER_RLOC16'] 149 COMMISSIONER = pv.vars['COMMISSIONER'] 150 COMMISSIONER_RLOC = pv.vars['COMMISSIONER_RLOC'] 151 ROUTER = pv.vars['ROUTER'] 152 ROUTER_RLOC = pv.vars['ROUTER_RLOC'] 153 ROUTER_RLOC16 = pv.vars['ROUTER_RLOC16'] 154 _lpkts = pkts.filter_wpan_src64(LEADER) 155 156 # Step 1: Ensure the topology is formed correctly 157 _lpkts.filter_wpan_dst64(COMMISSIONER).filter_mle_cmd(MLE_CHILD_ID_RESPONSE).must_next() 158 _lpkts_coap = _lpkts.copy() 159 160 # Step 4: Leader MUST send a unicast MLE Child ID Response to the Router 161 _lpkts.filter_wpan_dst64(ROUTER).filter_mle_cmd(MLE_CHILD_ID_RESPONSE).must_next( 162 ).must_verify(lambda p: {ACTIVE_OPERATION_DATASET_TLV, ACTIVE_TIMESTAMP_TLV} < set(p.mle.tlv.type) and { 163 NM_CHANNEL_TLV, NM_CHANNEL_MASK_TLV, NM_EXTENDED_PAN_ID_TLV, NM_NETWORK_KEY_TLV, 164 NM_NETWORK_MESH_LOCAL_PREFIX_TLV, NM_NETWORK_NAME_TLV, NM_PAN_ID_TLV, NM_PSKC_TLV, NM_SECURITY_POLICY_TLV 165 } <= set(p.thread_meshcop.tlv.type) and p.mle.tlv.active_tstamp == LEADER_ACTIVE_TIMESTAMP) 166 167 # Step 6: Leader automatically sends a MGMT_ACTIVE_SET.rsp to the Router 168 _lpkts.filter_ipv6_dst(ROUTER_RLOC).filter_coap_ack(MGMT_ACTIVE_SET_URI).must_next().must_verify( 169 lambda p: p.coap.code == COAP_CODE_ACK and p.thread_meshcop.tlv.state == 1) 170 171 # Step 7: Leader multicasts a MLE Data Response with the new information 172 _lpkts.filter_LLANMA().filter_mle_cmd(MLE_DATA_RESPONSE).must_next().must_verify( 173 lambda p: {SOURCE_ADDRESS_TLV, LEADER_DATA_TLV, ACTIVE_TIMESTAMP_TLV, NETWORK_DATA_TLV} <= set( 174 p.mle.tlv.type) and {NM_BORDER_AGENT_LOCATOR_TLV, NM_COMMISSIONER_SESSION_ID_TLV} <= set( 175 p.thread_meshcop.tlv.type) and p.thread_nwd.tlv.stable == [0] and p.mle.tlv.active_tstamp == 176 ROUTER_ACTIVE_TIMESTAMP) 177 178 # Step 10: Leader MUST send a unicast MLE Data Response to the Router 179 _lpkts.filter_wpan_dst64(ROUTER).filter_mle_cmd(MLE_DATA_RESPONSE).must_next( 180 ).must_verify(lambda p: {ACTIVE_OPERATION_DATASET_TLV, ACTIVE_TIMESTAMP_TLV} < set(p.mle.tlv.type) and { 181 NM_CHANNEL_TLV, NM_CHANNEL_MASK_TLV, NM_EXTENDED_PAN_ID_TLV, NM_NETWORK_KEY_TLV, 182 NM_NETWORK_MESH_LOCAL_PREFIX_TLV, NM_NETWORK_NAME_TLV, NM_PAN_ID_TLV, NM_PSKC_TLV, NM_SECURITY_POLICY_TLV 183 } <= set(p.thread_meshcop.tlv.type) and p.mle.tlv.active_tstamp == ROUTER_ACTIVE_TIMESTAMP) 184 185 # Step 12: Leader sends a MGMT_PENDING_SET.rsp to the Router with Status = Accept 186 _lpkts_coap.filter_ipv6_dst(ROUTER_RLOC).filter_coap_ack(MGMT_PENDING_SET_URI).must_next().must_verify( 187 lambda p: p.coap.code == COAP_CODE_ACK and p.thread_meshcop.tlv.state == 1) 188 189 # Step 13: Leader sends a multicast MLE Data Response 190 _lpkts.filter_LLANMA().filter_mle_cmd(MLE_DATA_RESPONSE).must_next().must_verify( 191 lambda p: { 192 SOURCE_ADDRESS_TLV, LEADER_DATA_TLV, ACTIVE_TIMESTAMP_TLV, PENDING_TIMESTAMP_TLV, NETWORK_DATA_TLV 193 } <= set(p.mle.tlv.type) and p.thread_nwd.tlv.stable == [0] and p.mle.tlv.active_tstamp == 194 ROUTER_ACTIVE_TIMESTAMP and p.mle.tlv.pending_tstamp == ROUTER_PENDING_TIMESTAMP) 195 196 # Step 14: The DUT MUST send MGMT_DATASET_CHANGED.ntf to the Router 197 _lpkts_coap.filter_wpan_dst16(ROUTER_RLOC16).filter_coap_request(MGMT_DATASET_CHANGED_URI).must_next() 198 199 # Step 16: Leader MUST send a unicast MLE Data Response to the Router 200 _lpkts.filter_wpan_dst64(ROUTER).filter_mle_cmd(MLE_DATA_RESPONSE).must_next().must_verify( 201 lambda p: {ACTIVE_TIMESTAMP_TLV, PENDING_TIMESTAMP_TLV} < set(p.mle.tlv.type) and p.mle.tlv.active_tstamp 202 == ROUTER_ACTIVE_TIMESTAMP and p.mle.tlv.pending_tstamp == ROUTER_PENDING_TIMESTAMP) 203 204 # Step 18: The DUT MUST send MGMT_PENDING_SET.rsp to the Commissioner 205 _lpkts_coap.filter_ipv6_dst(COMMISSIONER_RLOC).filter_coap_ack(MGMT_PENDING_SET_URI).must_next().must_verify( 206 lambda p: p.coap.code == COAP_CODE_ACK and p.thread_meshcop.tlv.state == 1) 207 208 # Step 19: Leader MUST send a unicast MLE Data Response to the Router 209 _lpkts.filter_LLANMA().filter_mle_cmd(MLE_DATA_RESPONSE).must_next().must_verify( 210 lambda p: { 211 SOURCE_ADDRESS_TLV, LEADER_DATA_TLV, NETWORK_DATA_TLV, ACTIVE_TIMESTAMP_TLV, PENDING_TIMESTAMP_TLV 212 } <= set(p.mle.tlv.type) and p.thread_nwd.tlv.stable == [0] and p.mle.tlv.active_tstamp == 213 ROUTER_ACTIVE_TIMESTAMP and p.mle.tlv.pending_tstamp == COMMISSIONER_PENDING_TIMESTAMP) 214 215 # Step 20: Leader MUST send a unicast MLE Data Response to the Router 216 _lpkts.filter_wpan_dst64(ROUTER).filter_mle_cmd(MLE_DATA_RESPONSE).must_next( 217 ).must_verify(lambda p: {ACTIVE_TIMESTAMP_TLV, PENDING_TIMESTAMP_TLV, PENDING_OPERATION_DATASET_TLV} < set( 218 p.mle.tlv.type) and {NM_CHANNEL_TLV, NM_COMMISSIONER_SESSION_ID_TLV, NM_PAN_ID_TLV, NM_DELAY_TIMER_TLV} <= 219 set(p.thread_meshcop.tlv.type) and p.mle.tlv.active_tstamp == ROUTER_ACTIVE_TIMESTAMP and p.mle. 220 tlv.pending_tstamp == COMMISSIONER_PENDING_TIMESTAMP and p.thread_meshcop.tlv.pan_id == 221 [COMMISSIONER_PENDING_PANID] and p.thread_meshcop.tlv.channel == [COMMISSIONER_PENDING_CHANNEL]) 222 223 # Step 21: Router MUST respond with an ICMPv6 Echo Reply 224 pkts.filter_wpan_src16_dst16(ROUTER_RLOC16, LEADER_RLOC16).filter_ping_reply().must_next() 225 226 227if __name__ == '__main__': 228 unittest.main() 229