1#!/usr/bin/env python3 2# 3# Copyright (c) 2019, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import unittest 31 32import command 33import config 34import mesh_cop 35import thread_cert 36from pktverify.consts import MLE_DATA_RESPONSE, LEAD_PET_URI, LEAD_KA_URI, MGMT_COMMISSIONER_SET_URI, NM_CHANNEL_TLV, NM_COMMISSIONER_ID_TLV, NM_COMMISSIONER_SESSION_ID_TLV, NM_STATE_TLV, NM_STEERING_DATA_TLV, NM_BORDER_AGENT_LOCATOR_TLV, LEADER_DATA_TLV, NETWORK_DATA_TLV, ACTIVE_TIMESTAMP_TLV, SOURCE_ADDRESS_TLV, NWD_COMMISSIONING_DATA_TLV, MESHCOP_ACCEPT, MESHCOP_REJECT, LEADER_ALOC 37from pktverify.packet_verifier import PacketVerifier 38from pktverify.bytes import Bytes 39 40COMMISSIONER = 1 41LEADER = 2 42 43# Test Purpose and Description: 44# ----------------------------- 45# The purpose of this test case is to verify Leader's and active Commissioner's behavior via 46# MGMT_COMMISSIONER_SET request and response 47# 48# Test Topology: 49# ------------- 50# Commissioner 51# | 52# Leader 53# 54# DUT Types: 55# ---------- 56# Leader 57# Commissioner 58 59 60class Cert_9_2_02_MGMTCommissionerSet(thread_cert.TestCase): 61 SUPPORT_NCP = False 62 63 TOPOLOGY = { 64 COMMISSIONER: { 65 'name': 'COMMISSIONER', 66 'mode': 'rdn', 67 'allowlist': [LEADER] 68 }, 69 LEADER: { 70 'name': 'LEADER', 71 'mode': 'rdn', 72 'allowlist': [COMMISSIONER] 73 }, 74 } 75 76 def test(self): 77 self.nodes[LEADER].start() 78 self.simulator.go(config.LEADER_STARTUP_DELAY) 79 self.assertEqual(self.nodes[LEADER].get_state(), 'leader') 80 81 self.nodes[COMMISSIONER].start() 82 self.simulator.go(config.ROUTER_STARTUP_DELAY) 83 self.assertEqual(self.nodes[COMMISSIONER].get_state(), 'router') 84 self.simulator.get_messages_sent_by(LEADER) 85 86 self.collect_rlocs() 87 self.collect_rloc16s() 88 89 self.nodes[COMMISSIONER].commissioner_start() 90 self.simulator.go(3) 91 92 leader_messages = self.simulator.get_messages_sent_by(LEADER) 93 msg = leader_messages.next_coap_message('2.04', assert_enabled=True) 94 commissioner_session_id_tlv = command.get_sub_tlv(msg.coap.payload, mesh_cop.CommissionerSessionId) 95 96 steering_data_tlv = mesh_cop.SteeringData(bytes([0xff])) 97 self.nodes[COMMISSIONER].commissioner_mgmtset_with_tlvs([steering_data_tlv]) 98 self.simulator.go(5) 99 100 self.nodes[COMMISSIONER].commissioner_mgmtset_with_tlvs([steering_data_tlv, commissioner_session_id_tlv]) 101 self.simulator.go(5) 102 103 border_agent_locator_tlv = mesh_cop.BorderAgentLocator(0x0400) 104 self.nodes[COMMISSIONER].commissioner_mgmtset_with_tlvs( 105 [commissioner_session_id_tlv, border_agent_locator_tlv]) 106 self.simulator.go(5) 107 108 self.nodes[COMMISSIONER].commissioner_mgmtset_with_tlvs([ 109 steering_data_tlv, 110 commissioner_session_id_tlv, 111 border_agent_locator_tlv, 112 ]) 113 self.simulator.go(5) 114 115 self.nodes[COMMISSIONER].commissioner_mgmtset_with_tlvs( 116 [mesh_cop.CommissionerSessionId(0xffff), steering_data_tlv]) 117 self.simulator.go(5) 118 119 self.nodes[COMMISSIONER].commissioner_mgmtset_with_tlvs([ 120 commissioner_session_id_tlv, 121 steering_data_tlv, 122 mesh_cop.Channel(0x0, 0x0), 123 ]) 124 self.simulator.go(5) 125 126 leader_rloc = self.nodes[LEADER].get_rloc() 127 commissioner_rloc = self.nodes[COMMISSIONER].get_rloc() 128 self.assertTrue(self.nodes[COMMISSIONER].ping(leader_rloc)) 129 self.simulator.go(1) 130 self.assertTrue(self.nodes[LEADER].ping(commissioner_rloc)) 131 132 def verify(self, pv): 133 pkts = pv.pkts 134 pv.summary.show() 135 136 LEADER = pv.vars['LEADER'] 137 LEADER_RLOC = pv.vars['LEADER_RLOC'] 138 LEADER_RLOC16 = pv.vars['LEADER_RLOC16'] 139 COMMISSIONER = pv.vars['COMMISSIONER'] 140 COMMISSIONER_RLOC = pv.vars['COMMISSIONER_RLOC'] 141 142 # Step 1: Ensure topology is formed correctly 143 pv.verify_attached('COMMISSIONER', 'LEADER') 144 145 # Step 2: Commissioner sends a Set Commissioner Dataset Request (MGMT_COMMISSIONER_SET.req) 146 # to Leader Anycast or Routing Locator: 147 # CoAP Request URI 148 # CON POST coap://<L>:MM/c/cs 149 # CoAP Payload 150 # (missing Commissioner Session ID TLV) 151 # Steering Data TLV (0xFF) 152 _mgmt_set_pkt = pkts.filter_wpan_src64(COMMISSIONER).\ 153 filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\ 154 filter_coap_request(MGMT_COMMISSIONER_SET_URI).\ 155 filter(lambda p: 156 [NM_STEERING_DATA_TLV] == p.coap.tlv.type and\ 157 p.thread_meshcop.tlv.steering_data == Bytes('ff') 158 ).\ 159 must_next() 160 161 # Step 3: Leader sends a Set Commissioner Dataset Response (MGMT_COMMISSIONER_SET.rsp) to 162 # Commissioner: 163 # CoAP Response Code 164 # 2.04 Changed 165 # CoAP Payload 166 # State TLV (value = Reject) 167 pkts.filter_ipv6_src_dst(_mgmt_set_pkt.ipv6.dst, COMMISSIONER_RLOC).\ 168 filter_coap_ack(MGMT_COMMISSIONER_SET_URI).\ 169 filter(lambda p: 170 [NM_STATE_TLV] == p.coap.tlv.type and\ 171 p.thread_meshcop.tlv.state == MESHCOP_REJECT 172 ).\ 173 must_next() 174 175 # Step 4: Commissioner sends a Set Commissioner Dataset Request (MGMT_COMMISSIONER_SET.req) 176 # to Leader Anycast or Routing Locator: 177 # CoAP Request URI 178 # CON POST coap://<L>:MM/c/cs 179 # CoAP Payload 180 # Commissioner Session ID TLV 181 # Steering Data TLV (0xFF) 182 _mgmt_set_pkt = pkts.filter_wpan_src64(COMMISSIONER).\ 183 filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\ 184 filter_coap_request(MGMT_COMMISSIONER_SET_URI).\ 185 filter(lambda p: { 186 NM_COMMISSIONER_SESSION_ID_TLV, 187 NM_STEERING_DATA_TLV 188 } == set(p.thread_meshcop.tlv.type) and\ 189 p.thread_meshcop.tlv.steering_data == Bytes('ff') 190 ).\ 191 must_next() 192 193 # Step 5: Leader sends a Set Commissioner Dataset Response (MGMT_COMMISSIONER_SET.rsp) to 194 # Commissioner: 195 # CoAP Response Code 196 # 2.04 Changed 197 # CoAP Payload 198 # State TLV (value = Accept) 199 pkts.filter_ipv6_src_dst(_mgmt_set_pkt.ipv6.dst, COMMISSIONER_RLOC).\ 200 filter_coap_ack(MGMT_COMMISSIONER_SET_URI).\ 201 filter(lambda p: 202 [NM_STATE_TLV] == p.coap.tlv.type and\ 203 p.thread_meshcop.tlv.state == MESHCOP_ACCEPT 204 ).\ 205 must_next() 206 207 # Step 6: Leader sends a MLE Data Response to the network with the 208 # following TLVs: 209 # - Active Timestamp TLV 210 # - Leader Data TLV 211 # - Network Data TLV 212 # - Source Address TLV 213 pkts.filter_wpan_src64(LEADER).\ 214 filter_LLANMA().\ 215 filter_mle_cmd(MLE_DATA_RESPONSE).\ 216 filter(lambda p: { 217 NETWORK_DATA_TLV, 218 SOURCE_ADDRESS_TLV, 219 ACTIVE_TIMESTAMP_TLV, 220 LEADER_DATA_TLV 221 } == set(p.mle.tlv.type) and\ 222 { 223 NWD_COMMISSIONING_DATA_TLV 224 } == set(p.thread_nwd.tlv.type) and\ 225 { 226 NM_BORDER_AGENT_LOCATOR_TLV, 227 NM_COMMISSIONER_SESSION_ID_TLV, 228 NM_STEERING_DATA_TLV 229 } == set(p.thread_meshcop.tlv.type) and\ 230 p.thread_nwd.tlv.stable == [0] 231 ).\ 232 must_next() 233 234 # Step 7: Commissioner sends a Set Commissioner Dataset Request (MGMT_COMMISSIONER_SET.req) 235 # to Leader Anycast or Routing Locator: 236 # CoAP Request URI 237 # CON POST coap://<L>:MM/c/cs 238 # CoAP Payload 239 # Commissioner Session ID TLV 240 # Border Agent Locator TLV (0x0400) (not allowed TLV) 241 _mgmt_set_pkt = pkts.filter_wpan_src64(COMMISSIONER).\ 242 filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\ 243 filter_coap_request(MGMT_COMMISSIONER_SET_URI).\ 244 filter(lambda p: { 245 NM_COMMISSIONER_SESSION_ID_TLV, 246 NM_BORDER_AGENT_LOCATOR_TLV 247 } == set(p.thread_meshcop.tlv.type) and\ 248 p.thread_meshcop.tlv.ba_locator == 0x0400 249 ).\ 250 must_next() 251 252 # Step 8: Leader sends a Set Commissioner Dataset Response (MGMT_COMMISSIONER_SET.rsp) to 253 # Commissioner: 254 # CoAP Response Code 255 # 2.04 Changed 256 # CoAP Payload 257 # State TLV (value = Reject) 258 pkts.filter_ipv6_src_dst(_mgmt_set_pkt.ipv6.dst, COMMISSIONER_RLOC).\ 259 filter_coap_ack(MGMT_COMMISSIONER_SET_URI).\ 260 filter(lambda p: 261 [NM_STATE_TLV] == p.coap.tlv.type and\ 262 p.thread_meshcop.tlv.state == MESHCOP_REJECT 263 ).\ 264 must_next() 265 266 # Step 9: Commissioner sends a Set Commissioner Dataset Request (MGMT_COMMISSIONER_SET.req) 267 # to Leader Anycast or Routing Locator: 268 # CoAP Request URI 269 # CON POST coap://<L>:MM/c/cs 270 # CoAP Payload 271 # Commissioner Session ID TLV 272 # Steering Data TLV (0xFF) 273 # Border Agent Locator TLV (0x0400) (not allowed TLV) 274 _mgmt_set_pkt = pkts.filter_wpan_src64(COMMISSIONER).\ 275 filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\ 276 filter_coap_request(MGMT_COMMISSIONER_SET_URI).\ 277 filter(lambda p: { 278 NM_COMMISSIONER_SESSION_ID_TLV, 279 NM_STEERING_DATA_TLV, 280 NM_BORDER_AGENT_LOCATOR_TLV 281 } == set(p.thread_meshcop.tlv.type) and\ 282 p.thread_meshcop.tlv.ba_locator == 0x0400 and\ 283 p.thread_meshcop.tlv.steering_data == Bytes('ff') 284 ).\ 285 must_next() 286 287 # Step 10: Leader sends a Set Commissioner Dataset Response (MGMT_COMMISSIONER_SET.rsp) to 288 # Commissioner: 289 # CoAP Response Code 290 # 2.04 Changed 291 # CoAP Payload 292 # State TLV (value = Reject) 293 pkts.filter_ipv6_src_dst(_mgmt_set_pkt.ipv6.dst, COMMISSIONER_RLOC).\ 294 filter_coap_ack(MGMT_COMMISSIONER_SET_URI).\ 295 filter(lambda p: 296 [NM_STATE_TLV] == p.coap.tlv.type and\ 297 p.thread_meshcop.tlv.state == MESHCOP_REJECT 298 ).\ 299 must_next() 300 301 # Step 11: Commissioner sends a Set Commissioner Dataset Request (MGMT_COMMISSIONER_SET.req) 302 # to Leader Anycast or Routing Locator: 303 # CoAP Request URI 304 # CON POST coap://<L>:MM/c/cs 305 # CoAP Payload 306 # Commissioner Session ID TLV (0xFFFF) (invalid value) 307 # Steering Data TLV (0xFF) 308 _mgmt_set_pkt = pkts.filter_wpan_src64(COMMISSIONER).\ 309 filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\ 310 filter_coap_request(MGMT_COMMISSIONER_SET_URI).\ 311 filter(lambda p: { 312 NM_COMMISSIONER_SESSION_ID_TLV, 313 NM_STEERING_DATA_TLV 314 } == set(p.thread_meshcop.tlv.type) and\ 315 p.thread_meshcop.tlv.commissioner_sess_id == 0xFFFF and\ 316 p.thread_meshcop.tlv.steering_data == Bytes('ff') 317 ).\ 318 must_next() 319 320 # Step 12: Leader sends a Set Commissioner Dataset Response (MGMT_COMMISSIONER_SET.rsp) to 321 # Commissioner: 322 # CoAP Response Code 323 # 2.04 Changed 324 # CoAP Payload 325 # State TLV (value = Reject) 326 pkts.filter_ipv6_src_dst(_mgmt_set_pkt.ipv6.dst, COMMISSIONER_RLOC).\ 327 filter_coap_ack(MGMT_COMMISSIONER_SET_URI).\ 328 filter(lambda p: 329 [NM_STATE_TLV] == p.coap.tlv.type and\ 330 p.thread_meshcop.tlv.state == MESHCOP_REJECT 331 ).\ 332 must_next() 333 334 # Step 13: Commissioner sends a Set Commissioner Dataset Request (MGMT_COMMISSIONER_SET.req) 335 # to Leader Anycast or Routing Locator: 336 # CoAP Request URI 337 # CON POST coap://<L>:MM/c/cs 338 # CoAP Payload 339 # Commissioner Session ID TLV 340 # Steering Data TLV (0xFF) 341 # Channel TLV (not allowed TLV) 342 _mgmt_set_pkt = pkts.filter_wpan_src64(COMMISSIONER).\ 343 filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\ 344 filter_coap_request(MGMT_COMMISSIONER_SET_URI).\ 345 filter(lambda p: { 346 NM_COMMISSIONER_SESSION_ID_TLV, 347 NM_STEERING_DATA_TLV, 348 NM_CHANNEL_TLV 349 } == set(p.thread_meshcop.tlv.type) and\ 350 p.thread_meshcop.tlv.steering_data == Bytes('ff') 351 ).\ 352 must_next() 353 354 # Step 14: Leader sends a Set Commissioner Dataset Response (MGMT_COMMISSIONER_SET.rsp) to 355 # Commissioner: 356 # CoAP Response Code 357 # 2.04 Changed 358 # CoAP Payload 359 # State TLV (value = Accept) 360 pkts.filter_ipv6_src_dst(_mgmt_set_pkt.ipv6.dst, COMMISSIONER_RLOC).\ 361 filter_coap_ack(MGMT_COMMISSIONER_SET_URI).\ 362 filter(lambda p: 363 [NM_STATE_TLV] == p.coap.tlv.type and\ 364 p.thread_meshcop.tlv.state == MESHCOP_ACCEPT 365 ).\ 366 must_next() 367 368 # Step 15: Verify connectivity by sending an ICMPv6 Echo Request to the DUT mesh local address 369 _pkt = pkts.filter_ping_request().\ 370 filter_ipv6_src_dst(COMMISSIONER_RLOC, LEADER_RLOC).\ 371 must_next() 372 pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 373 filter_ipv6_src_dst(LEADER_RLOC, COMMISSIONER_RLOC).\ 374 must_next() 375 376 _pkt = pkts.filter_ping_request().\ 377 filter_ipv6_src_dst(LEADER_RLOC, COMMISSIONER_RLOC).\ 378 must_next() 379 pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 380 filter_ipv6_src_dst(COMMISSIONER_RLOC, LEADER_RLOC).\ 381 must_next() 382 383 384if __name__ == '__main__': 385 unittest.main() 386