1#!/usr/bin/env python3 2# 3# Copyright (c) 2020, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import unittest 31 32import config 33import mesh_cop 34import thread_cert 35from pktverify.consts import MLE_DATA_RESPONSE, MGMT_ACTIVE_GET_URI, NM_CHANNEL_TLV, NM_COMMISSIONER_ID_TLV, NM_COMMISSIONER_SESSION_ID_TLV, NM_STEERING_DATA_TLV, NM_BORDER_AGENT_LOCATOR_TLV, NM_PAN_ID_TLV, NM_NETWORK_NAME_TLV, NM_NETWORK_MESH_LOCAL_PREFIX_TLV, NM_PSKC_TLV, NM_SCAN_DURATION, NM_ENERGY_LIST_TLV, NM_ACTIVE_TIMESTAMP_TLV, NM_CHANNEL_MASK_TLV, NM_EXTENDED_PAN_ID_TLV, NM_NETWORK_KEY_TLV, NM_SECURITY_POLICY_TLV, LEADER_ALOC 36from pktverify.packet_verifier import PacketVerifier 37from pktverify.null_field import nullField 38 39COMMISSIONER = 1 40LEADER = 2 41 42# Test Purpose and Description: 43# ----------------------------- 44# The purpose of this test case is to verify Leader's and active Commissioner's behavior via 45# MGMT_ACTIVE_GET request and response 46# 47# Test Topology: 48# ------------- 49# Commissioner 50# | 51# Leader 52# 53# DUT Types: 54# ---------- 55# Leader 56# Commissioner 57 58 59class Cert_9_2_03_ActiveDatasetGet(thread_cert.TestCase): 60 SUPPORT_NCP = False 61 USE_MESSAGE_FACTORY = False 62 63 TOPOLOGY = { 64 COMMISSIONER: { 65 'name': 'COMMISSIONER', 66 'mode': 'rdn', 67 'allowlist': [LEADER] 68 }, 69 LEADER: { 70 'name': 'LEADER', 71 'mode': 'rdn', 72 'allowlist': [COMMISSIONER] 73 }, 74 } 75 76 def test(self): 77 self.nodes[LEADER].start() 78 self.simulator.go(config.LEADER_STARTUP_DELAY) 79 self.assertEqual(self.nodes[LEADER].get_state(), 'leader') 80 81 self.nodes[COMMISSIONER].start() 82 self.simulator.go(config.ROUTER_STARTUP_DELAY) 83 self.assertEqual(self.nodes[COMMISSIONER].get_state(), 'router') 84 85 self.collect_rlocs() 86 self.collect_rloc16s() 87 88 leader_rloc = self.nodes[LEADER].get_rloc() 89 90 self.nodes[COMMISSIONER].commissioner_start() 91 self.simulator.go(3) 92 93 self.nodes[COMMISSIONER].send_mgmt_active_get() 94 self.simulator.go(5) 95 96 self.nodes[COMMISSIONER].send_mgmt_active_get( 97 leader_rloc, 98 [mesh_cop.TlvType.CHANNEL_MASK, mesh_cop.TlvType.NETWORK_MESH_LOCAL_PREFIX, mesh_cop.TlvType.NETWORK_NAME]) 99 self.simulator.go(5) 100 101 self.nodes[COMMISSIONER].send_mgmt_active_get(leader_rloc, [ 102 mesh_cop.TlvType.CHANNEL, mesh_cop.TlvType.NETWORK_MESH_LOCAL_PREFIX, mesh_cop.TlvType.NETWORK_NAME, 103 mesh_cop.TlvType.SCAN_DURATION, mesh_cop.TlvType.ENERGY_LIST 104 ]) 105 self.simulator.go(5) 106 107 commissioner_rloc = self.nodes[COMMISSIONER].get_rloc() 108 self.assertTrue(self.nodes[COMMISSIONER].ping(leader_rloc)) 109 self.simulator.go(1) 110 self.assertTrue(self.nodes[LEADER].ping(commissioner_rloc)) 111 112 def verify(self, pv): 113 pkts = pv.pkts 114 pv.summary.show() 115 116 LEADER = pv.vars['LEADER'] 117 LEADER_RLOC = pv.vars['LEADER_RLOC'] 118 COMMISSIONER = pv.vars['COMMISSIONER'] 119 COMMISSIONER_RLOC = pv.vars['COMMISSIONER_RLOC'] 120 121 # Step 1: Ensure topology is formed correctly 122 pv.verify_attached('COMMISSIONER', 'LEADER') 123 124 # Step 2: Commissioner sends a MGMT_ACTIVE_GET.req to Leader Anycast 125 # or Routing Locator: 126 # CoAP Request URI 127 # CON POST coap://<L>:MM/c/ag 128 # CoAP Payload 129 # <empty> - get all Active Operational Dataset parameters 130 _pkt = pkts.filter_wpan_src64(COMMISSIONER).\ 131 filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\ 132 filter_coap_request(MGMT_ACTIVE_GET_URI).\ 133 filter(lambda p: p.coap.tlv.type is nullField).\ 134 must_next() 135 136 # Step 3: Leader sends a MGMT_ACTIVE_GET.rsp to Commissioner with 137 # the following format: 138 # CoAP Response Code 139 # 2.04 Changed 140 # CoAP Payload 141 # (entire Active Operational Dataset) 142 # Active Timestamp TLV 143 # Channel TLV 144 # Channel Mask TLV 145 # Extended PAN ID TLV 146 # Network Mesh-Local Prefix TLV 147 # Network Key TLV 148 # Network Name TLV 149 # PAN ID TLV 150 # PSKc TLV 151 # Security Policy TLV 152 pkts.filter_ipv6_src_dst(_pkt.ipv6.dst, COMMISSIONER_RLOC).\ 153 filter_coap_ack(MGMT_ACTIVE_GET_URI).\ 154 filter(lambda p: { 155 NM_ACTIVE_TIMESTAMP_TLV, 156 NM_CHANNEL_TLV, 157 NM_CHANNEL_MASK_TLV, 158 NM_EXTENDED_PAN_ID_TLV, 159 NM_NETWORK_MESH_LOCAL_PREFIX_TLV, 160 NM_NETWORK_KEY_TLV, 161 NM_NETWORK_NAME_TLV, 162 NM_PAN_ID_TLV, 163 NM_PSKC_TLV, 164 NM_SECURITY_POLICY_TLV 165 } <= set(p.thread_meshcop.tlv.type) 166 ).\ 167 must_next() 168 169 # Step 4: Commissioner sends a MGMT_ACTIVE_GET.req to Leader Anycast 170 # or Routing Locator: 171 # CoAP Request URI 172 # CON POST coap://<L>:MM/c/ag 173 # CoAP Payload 174 # Channel Mask TLV 175 # Network Mesh-Local Prefix TLV 176 # Network Name TLV 177 pkts.filter_wpan_src64(COMMISSIONER).\ 178 filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\ 179 filter_coap_request(MGMT_ACTIVE_GET_URI).\ 180 filter(lambda p: { 181 NM_CHANNEL_MASK_TLV, 182 NM_NETWORK_MESH_LOCAL_PREFIX_TLV, 183 NM_NETWORK_NAME_TLV 184 } <= set(p.thread_meshcop.tlv.type) 185 ).\ 186 must_next() 187 188 # Step 5: Leader sends a MGMT_ACTIVE_GET.rsp to Commissioner with 189 # the following format: 190 # CoAP Response Code 191 # 2.04 Changed 192 # CoAP Payload 193 # Channel Mask TLV 194 # Network Mesh-Local Prefix TLV 195 # Network Name TLV 196 pkts.filter_ipv6_src_dst(LEADER_RLOC, COMMISSIONER_RLOC).\ 197 filter_coap_ack(MGMT_ACTIVE_GET_URI).\ 198 filter(lambda p: { 199 NM_CHANNEL_MASK_TLV, 200 NM_NETWORK_MESH_LOCAL_PREFIX_TLV, 201 NM_NETWORK_NAME_TLV 202 } <= set(p.thread_meshcop.tlv.type) 203 ).\ 204 must_next() 205 206 # Step 6: Commissioner sends a MGMT_ACTIVE_GET.req to Leader Anycast 207 # or Routing Locator: 208 # CoAP Request URI 209 # CON POST coap://<L>:MM/c/ag 210 # CoAP Payload 211 # Channel TLV 212 # Network Mesh-Local Prefix TLV 213 # Network Name TLV 214 # Scan Duration TLV (not allowed TLV) 215 # Energy List TLV (not allowed TLV) 216 pkts.filter_wpan_src64(COMMISSIONER).\ 217 filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\ 218 filter_coap_request(MGMT_ACTIVE_GET_URI).\ 219 filter(lambda p: { 220 NM_CHANNEL_TLV, 221 NM_NETWORK_MESH_LOCAL_PREFIX_TLV, 222 NM_NETWORK_NAME_TLV, 223 NM_SCAN_DURATION, 224 NM_ENERGY_LIST_TLV 225 } <= set(p.thread_meshcop.tlv.type) 226 ).\ 227 must_next() 228 229 # Step 7: Leader sends a MGMT_ACTIVE_GET.rsp to Commissioner with 230 # the following format: 231 # CoAP Response Code 232 # 2.04 Changed 233 # CoAP Payload 234 # Channel TLV 235 # Network Mesh-Local Prefix TLV 236 # Network Name TLV 237 pkts.filter_ipv6_src_dst(LEADER_RLOC, COMMISSIONER_RLOC).\ 238 filter_coap_ack(MGMT_ACTIVE_GET_URI).\ 239 filter(lambda p: { 240 NM_CHANNEL_TLV, 241 NM_NETWORK_MESH_LOCAL_PREFIX_TLV, 242 NM_NETWORK_NAME_TLV, 243 } <= set(p.thread_meshcop.tlv.type) 244 ).\ 245 must_next() 246 247 # Step 8: Verify connectivity by sending an ICMPv6 Echo Request to the 248 # DUT mesh local address 249 _pkt = pkts.filter_ping_request().\ 250 filter_ipv6_src_dst(COMMISSIONER_RLOC, LEADER_RLOC).\ 251 must_next() 252 pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 253 filter_ipv6_src_dst(LEADER_RLOC, COMMISSIONER_RLOC).\ 254 must_next() 255 256 _pkt = pkts.filter_ping_request().\ 257 filter_ipv6_src_dst(LEADER_RLOC, COMMISSIONER_RLOC).\ 258 must_next() 259 pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 260 filter_ipv6_src_dst(COMMISSIONER_RLOC, LEADER_RLOC).\ 261 must_next() 262 263 264if __name__ == '__main__': 265 unittest.main() 266