1#!/usr/bin/env python3 2# 3# Copyright (c) 2020, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import unittest 31 32import config 33import thread_cert 34from pktverify.consts import MLE_DATA_RESPONSE, MGMT_COMMISSIONER_GET_URI, NM_CHANNEL_TLV, NM_COMMISSIONER_ID_TLV, NM_COMMISSIONER_SESSION_ID_TLV, NM_STEERING_DATA_TLV, NM_BORDER_AGENT_LOCATOR_TLV, LEADER_DATA_TLV, NETWORK_DATA_TLV, ACTIVE_TIMESTAMP_TLV, SOURCE_ADDRESS_TLV, NWD_COMMISSIONING_DATA_TLV, NM_PAN_ID_TLV, NM_NETWORK_NAME_TLV 35from pktverify.packet_verifier import PacketVerifier 36from pktverify.null_field import nullField 37 38COMMISSIONER = 1 39LEADER = 2 40 41# Test Purpose and Description: 42# ----------------------------- 43# The purpose of this test case is to verify Leader's and active Commissioner's behavior via 44# MGMT_COMMISSIONER_GET request and response 45# 46# Test Topology: 47# ------------- 48# Commissioner 49# | 50# Leader 51# 52# DUT Types: 53# ---------- 54# Leader 55# Commissioner 56 57 58class Cert_9_2_01_MGMTCommissionerGet(thread_cert.TestCase): 59 SUPPORT_NCP = False 60 USE_MESSAGE_FACTORY = True 61 62 TOPOLOGY = { 63 COMMISSIONER: { 64 'name': 'COMMISSIONER', 65 'mode': 'rdn', 66 'allowlist': [LEADER] 67 }, 68 LEADER: { 69 'name': 'LEADER', 70 'mode': 'rdn', 71 'allowlist': [COMMISSIONER] 72 }, 73 } 74 75 def test(self): 76 self.nodes[LEADER].start() 77 self.simulator.go(config.LEADER_STARTUP_DELAY) 78 self.assertEqual(self.nodes[LEADER].get_state(), 'leader') 79 80 self.nodes[COMMISSIONER].start() 81 self.simulator.go(config.ROUTER_STARTUP_DELAY) 82 self.assertEqual(self.nodes[COMMISSIONER].get_state(), 'router') 83 84 self.collect_leader_aloc(LEADER) 85 self.collect_rlocs() 86 self.collect_rloc16s() 87 88 self.nodes[COMMISSIONER].commissioner_start() 89 self.simulator.go(3) 90 91 self.nodes[COMMISSIONER].commissioner_mgmtget() 92 self.simulator.go(5) 93 94 self.nodes[COMMISSIONER].commissioner_mgmtget('0b08') 95 self.simulator.go(5) 96 97 self.nodes[COMMISSIONER].commissioner_mgmtget('0b01') 98 self.simulator.go(5) 99 100 self.nodes[COMMISSIONER].commissioner_mgmtget('0903') 101 self.simulator.go(5) 102 103 leader_rloc = self.nodes[LEADER].get_rloc() 104 commissioner_rloc = self.nodes[COMMISSIONER].get_rloc() 105 self.assertTrue(self.nodes[COMMISSIONER].ping(leader_rloc)) 106 self.simulator.go(1) 107 self.assertTrue(self.nodes[LEADER].ping(commissioner_rloc)) 108 109 def verify(self, pv): 110 pkts = pv.pkts 111 pv.summary.show() 112 113 LEADER = pv.vars['LEADER'] 114 LEADER_ALOC = pv.vars['LEADER_ALOC'] 115 LEADER_RLOC = pv.vars['LEADER_RLOC'] 116 COMMISSIONER = pv.vars['COMMISSIONER'] 117 COMMISSIONER_RLOC = pv.vars['COMMISSIONER_RLOC'] 118 119 # Step 1: Ensure topology is formed correctly 120 pv.verify_attached('COMMISSIONER', 'LEADER') 121 122 # Step 2: Commissioner sends a MGMT_COMMISSIONER_GET.req to Leader Anycast 123 # or Routing Locator: 124 # CoAP Request URI 125 # CON POST coap://<L>:MM/c/cg 126 # CoAP Payload 127 # <empty> - get all Commissioner Dataset parameters 128 _pkt = pkts.filter_wpan_src64(COMMISSIONER).\ 129 filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\ 130 filter_coap_request(MGMT_COMMISSIONER_GET_URI).\ 131 filter(lambda p: p.coap.tlv.type is nullField).\ 132 must_next() 133 134 # Step 3: Leader sends a MGMT_COMMISSIONER_GET.rsp to Commissioner with 135 # the following format: 136 # CoAP Response Code 137 # 2.04 Changed 138 # CoAP Payload 139 # (entire Commissioner Dataset) 140 # Border Agent Locator TLV 141 # Commissioner Session ID TLV 142 # Steering Data TLV 143 pkts.filter_ipv6_src_dst(_pkt.ipv6.dst, COMMISSIONER_RLOC).\ 144 filter_coap_ack(MGMT_COMMISSIONER_GET_URI).\ 145 filter(lambda p: { 146 NM_COMMISSIONER_SESSION_ID_TLV, 147 NM_BORDER_AGENT_LOCATOR_TLV, 148 NM_STEERING_DATA_TLV 149 } <= set(p.thread_meshcop.tlv.type) and\ 150 p.thread_meshcop.tlv.ba_locator is not nullField and\ 151 p.thread_meshcop.tlv.commissioner_sess_id is not nullField and\ 152 p.thread_meshcop.tlv.steering_data is not nullField 153 ).\ 154 must_next() 155 156 # Step 4: Commissioner sends a MGMT_COMMISSIONER_GET.req to Leader Anycast 157 # or Routing Locator: 158 # CoAP Request URI 159 # CON POST coap://<L>:MM/c/cg 160 # CoAP Payload 161 # Commissioner Session ID TLV 162 # Steering Data TLV 163 _mgmt_get_pkt = pkts.filter_wpan_src64(COMMISSIONER).\ 164 filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\ 165 filter_coap_request(MGMT_COMMISSIONER_GET_URI).\ 166 filter(lambda p: { 167 NM_COMMISSIONER_SESSION_ID_TLV, 168 NM_STEERING_DATA_TLV 169 } <= set(p.thread_meshcop.tlv.type) 170 ).\ 171 must_next() 172 173 # Step 5: Leader sends a MGMT_COMMISSIONER_GET.rsp to Commissioner with 174 # the following format: 175 # CoAP Response Code 176 # 2.04 Changed 177 # CoAP Payload 178 # Encoded values for the requested Commissioner Dataset parameters 179 # Commissioner Session ID TLV 180 # Steering Data TLV 181 pkts.filter_ipv6_src_dst(_mgmt_get_pkt.ipv6.dst, COMMISSIONER_RLOC).\ 182 filter_coap_ack(MGMT_COMMISSIONER_GET_URI).\ 183 filter(lambda p: { 184 NM_COMMISSIONER_SESSION_ID_TLV, 185 NM_STEERING_DATA_TLV 186 } <= set(p.thread_meshcop.tlv.type) and\ 187 p.thread_meshcop.tlv.commissioner_sess_id is not nullField and\ 188 p.thread_meshcop.tlv.steering_data is not nullField 189 ).\ 190 must_next() 191 192 # Step 6: Commissioner sends a MGMT_COMMISSIONER_GET.req to Leader Anycast 193 # or Routing Locator: 194 # CoAP Request URI 195 # CON POST coap://<L>:MM/c/cg 196 # CoAP Payload 197 # Commissioner Session ID TLV 198 # PAN ID TLV 199 _mgmt_get_pkt = pkts.filter_wpan_src64(COMMISSIONER).\ 200 filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\ 201 filter_coap_request(MGMT_COMMISSIONER_GET_URI).\ 202 filter(lambda p: { 203 NM_COMMISSIONER_SESSION_ID_TLV, 204 NM_PAN_ID_TLV 205 } <= set(p.thread_meshcop.tlv.type) 206 ).\ 207 must_next() 208 209 # Step 7: Leader sends a MGMT_COMMISSIONER_GET.rsp to Commissioner with 210 # the following format: 211 # CoAP Response Code 212 # 2.04 Changed 213 # CoAP Payload 214 # Encoded values for the requested Commissioner Dataset parameters 215 # Commissioner Session ID TLV 216 # (PAN ID TLV in Get TLV is ignored) 217 pkts.filter_ipv6_src_dst(_mgmt_get_pkt.ipv6.dst, COMMISSIONER_RLOC).\ 218 filter_coap_ack(MGMT_COMMISSIONER_GET_URI).\ 219 filter(lambda p: { 220 NM_COMMISSIONER_SESSION_ID_TLV 221 } <= set(p.thread_meshcop.tlv.type) and\ 222 p.thread_meshcop.tlv.commissioner_sess_id is not nullField and\ 223 p.thread_meshcop.tlv.pan_id is nullField 224 ).\ 225 must_next() 226 227 # Step 8: Commissioner sends a MGMT_COMMISSIONER_GET.req to Leader Anycast 228 # or Routing Locator: 229 # CoAP Request URI 230 # CON POST coap://<L>:MM/c/cg 231 # CoAP Payload 232 # Border Agent Locator TLV 233 # Network Name TLV 234 _mgmt_get_pkt = pkts.filter_wpan_src64(COMMISSIONER).\ 235 filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\ 236 filter_coap_request(MGMT_COMMISSIONER_GET_URI).\ 237 filter(lambda p: { 238 NM_BORDER_AGENT_LOCATOR_TLV, 239 NM_NETWORK_NAME_TLV 240 } <= set(p.thread_meshcop.tlv.type) 241 ).\ 242 must_next() 243 244 # Step 9: Leader sends a MGMT_COMMISSIONER_GET.rsp to Commissioner with 245 # the following format: 246 # CoAP Response Code 247 # 2.04 Changed 248 # CoAP Payload 249 # Encoded values for the requested Commissioner Dataset parameters 250 # Border Agent Locator TLV 251 # (Network Name TLV in Get TLV is ignored) 252 pkts.filter_ipv6_src_dst(_mgmt_get_pkt.ipv6.dst, COMMISSIONER_RLOC).\ 253 filter_coap_ack(MGMT_COMMISSIONER_GET_URI).\ 254 filter(lambda p: { 255 NM_BORDER_AGENT_LOCATOR_TLV 256 } <= set(p.thread_meshcop.tlv.type) and\ 257 p.thread_meshcop.tlv.ba_locator is not nullField and\ 258 p.thread_meshcop.tlv.net_name is nullField 259 ).\ 260 must_next() 261 262 # Step 10: Verify connectivity by sending an ICMPv6 Echo Request to the DUT mesh local address 263 _pkt = pkts.filter_ping_request().\ 264 filter_ipv6_src_dst(COMMISSIONER_RLOC, LEADER_RLOC).\ 265 must_next() 266 pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 267 filter_ipv6_src_dst(LEADER_RLOC, COMMISSIONER_RLOC).\ 268 must_next() 269 270 _pkt = pkts.filter_ping_request().\ 271 filter_ipv6_src_dst(LEADER_RLOC, COMMISSIONER_RLOC).\ 272 must_next() 273 pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 274 filter_ipv6_src_dst(COMMISSIONER_RLOC, LEADER_RLOC).\ 275 must_next() 276 277 278if __name__ == '__main__': 279 unittest.main() 280