1#!/usr/bin/env python3 2# 3# Copyright (c) 2020, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import unittest 31 32import config 33import mesh_cop 34import thread_cert 35from pktverify.consts import MGMT_PENDING_GET_URI, MGMT_PENDING_SET_URI, NM_CHANNEL_TLV, NM_PAN_ID_TLV, NM_NETWORK_NAME_TLV, NM_NETWORK_MESH_LOCAL_PREFIX_TLV, NM_PSKC_TLV, NM_ACTIVE_TIMESTAMP_TLV, NM_CHANNEL_MASK_TLV, NM_EXTENDED_PAN_ID_TLV, NM_NETWORK_KEY_TLV, NM_SECURITY_POLICY_TLV, NM_PENDING_TIMESTAMP_TLV, NM_DELAY_TIMER_TLV, LEADER_ALOC 36from pktverify.packet_verifier import PacketVerifier 37from pktverify.null_field import nullField 38 39COMMISSIONER = 1 40LEADER = 2 41 42# Test Purpose and Description: 43# ----------------------------- 44# The purpose of this test case is to verify Leader's and active Commissioner's behavior via 45# MGMT_PENDING_GET request and response 46# 47# Test Topology: 48# ------------- 49# Commissioner 50# | 51# Leader 52# 53# DUT Types: 54# ---------- 55# Leader 56# Commissioner 57 58 59class Cert_9_2_19_PendingDatasetGet(thread_cert.TestCase): 60 SUPPORT_NCP = False 61 62 TOPOLOGY = { 63 COMMISSIONER: { 64 'name': 'COMMISSIONER', 65 'mode': 'rdn', 66 'allowlist': [LEADER] 67 }, 68 LEADER: { 69 'name': 'LEADER', 70 'mode': 'rdn', 71 'allowlist': [COMMISSIONER] 72 }, 73 } 74 75 def test(self): 76 self.nodes[LEADER].start() 77 self.simulator.go(config.LEADER_STARTUP_DELAY) 78 self.assertEqual(self.nodes[LEADER].get_state(), 'leader') 79 80 self.nodes[COMMISSIONER].start() 81 self.simulator.go(config.ROUTER_STARTUP_DELAY) 82 self.assertEqual(self.nodes[COMMISSIONER].get_state(), 'router') 83 self.simulator.get_messages_sent_by(LEADER) 84 85 self.collect_rlocs() 86 self.collect_rloc16s() 87 88 leader_rloc = self.nodes[LEADER].get_rloc() 89 90 self.nodes[COMMISSIONER].commissioner_start() 91 self.simulator.go(3) 92 93 self.nodes[COMMISSIONER].send_mgmt_pending_get() 94 self.simulator.go(2) 95 96 self.nodes[COMMISSIONER].send_mgmt_pending_set( 97 active_timestamp=60, 98 delay_timer=60000, 99 panid=0xAFCE, 100 pending_timestamp=30, 101 ) 102 self.simulator.go(2) 103 104 self.nodes[COMMISSIONER].send_mgmt_pending_get() 105 self.simulator.go(2) 106 107 self.nodes[COMMISSIONER].send_mgmt_pending_get(leader_rloc, [mesh_cop.TlvType.PAN_ID]) 108 self.simulator.go(92) 109 110 self.nodes[COMMISSIONER].send_mgmt_pending_get() 111 self.simulator.go(2) 112 113 def verify(self, pv): 114 pkts = pv.pkts 115 pv.summary.show() 116 117 LEADER = pv.vars['LEADER'] 118 LEADER_RLOC = pv.vars['LEADER_RLOC'] 119 COMMISSIONER = pv.vars['COMMISSIONER'] 120 COMMISSIONER_RLOC = pv.vars['COMMISSIONER_RLOC'] 121 122 # Step 1: Ensure topology is formed correctly 123 pv.verify_attached('COMMISSIONER', 'LEADER') 124 125 # Step 2: Commissioner sends a MGMT_PENDING_GET.req to Leader Anycast 126 # or Routing Locator: 127 # CoAP Request URI 128 # CON POST coap://<L>:MM/c/pg 129 # CoAP Payload 130 # <empty> - get all Active Operational Dataset parameters 131 _mgmt_get_pkt = pkts.filter_wpan_src64(COMMISSIONER).\ 132 filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\ 133 filter_coap_request(MGMT_PENDING_GET_URI).\ 134 filter(lambda p: p.coap.tlv.type is nullField).\ 135 must_next() 136 137 # Step 3: Leader sends a MGMT_PENDING_GET.rsp to Commissioner with 138 # the following format: 139 # CoAP Response Code 140 # 2.04 Changed 141 # CoAP Payload 142 # <empty> - (no Pending Operational Dataset) 143 pkts.filter_ipv6_src_dst(_mgmt_get_pkt.ipv6.dst, COMMISSIONER_RLOC).\ 144 filter_coap_ack(MGMT_PENDING_GET_URI).\ 145 filter(lambda p: p.coap.tlv.type is nullField).\ 146 must_next() 147 148 # Step 4: Commissioner sends a MGMT_PENDING_SET.req to Leader Anycast 149 # or Routing Locator: 150 # CoAP Request URI 151 # CON POST coap://<L>:MM/c/ps 152 # CoAP Payload 153 # Active Timestamp TLV: 60s 154 # Commissioner Session ID TLV (valid) 155 # Delay Timer TLV: 1 minute 156 # Pending Timestamp TLV: 30s 157 # PAN ID TLV: 0xAFCE (new value) 158 _mgmt_pending_set_pkt = pkts.filter_wpan_src64(COMMISSIONER).\ 159 filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\ 160 filter_coap_request(MGMT_PENDING_SET_URI).\ 161 filter(lambda p: { 162 NM_ACTIVE_TIMESTAMP_TLV, 163 NM_PENDING_TIMESTAMP_TLV, 164 NM_DELAY_TIMER_TLV, 165 NM_PAN_ID_TLV 166 } <= set(p.thread_meshcop.tlv.type) and\ 167 p.thread_meshcop.tlv.active_tstamp == 60 and\ 168 p.thread_meshcop.tlv.pan_id == [0xafce] and\ 169 p.thread_meshcop.tlv.delay_timer == 60000 170 ).\ 171 must_next() 172 173 # Step 5: Leader sends a MGMT_PENDING_SET.rsp to Commissioner with 174 # the following format: 175 # CoAP Response Code 176 # 2.04 Changed 177 # CoAP Payload 178 # State TLV (value = Accept (01)) 179 pkts.filter_ipv6_src_dst(_mgmt_pending_set_pkt.ipv6.dst, COMMISSIONER_RLOC).\ 180 filter_coap_ack(MGMT_PENDING_SET_URI).\ 181 filter(lambda p: 182 p.thread_meshcop.tlv.state == 1 183 ).\ 184 must_next() 185 186 # Step 6: Commissioner sends a MGMT_PENDING_GET.req to Leader Anycast 187 # or Routing Locator: 188 # CoAP Request URI 189 # CON POST coap://<L>:MM/c/pg 190 # CoAP Payload 191 # <empty> - get all Active Operational Dataset parameters 192 _mgmt_pending_get_pkt = pkts.filter_wpan_src64(COMMISSIONER).\ 193 filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\ 194 filter_coap_request(MGMT_PENDING_GET_URI).\ 195 filter(lambda p: p.coap.tlv.type is nullField).\ 196 must_next() 197 198 # Step 7: Leader sends a MGMT_PENDING_GET.rsp to Commissioner with 199 # the following format: 200 # CoAP Response Code 201 # 2.04 Changed 202 # CoAP Payload 203 # (entire Active Operational Dataset) 204 # Active Timestamp TLV 205 # Channel TLV 206 # Channel Mask TLV 207 # Delay Timer TLV 208 # Extended PAN ID TLV 209 # Network Mesh-Local Prefix TLV 210 # Network Key TLV 211 # Network Name TLV 212 # PAN ID TLV 213 # Pending Timestamp TLV 214 # PSKc TLV 215 # Security Policy TLV 216 pkts.filter_ipv6_src_dst(_mgmt_pending_get_pkt.ipv6.dst, COMMISSIONER_RLOC).\ 217 filter_coap_ack(MGMT_PENDING_GET_URI).\ 218 filter(lambda p: { 219 NM_ACTIVE_TIMESTAMP_TLV, 220 NM_CHANNEL_TLV, 221 NM_CHANNEL_MASK_TLV, 222 NM_DELAY_TIMER_TLV, 223 NM_EXTENDED_PAN_ID_TLV, 224 NM_NETWORK_MESH_LOCAL_PREFIX_TLV, 225 NM_NETWORK_KEY_TLV, 226 NM_NETWORK_NAME_TLV, 227 NM_PAN_ID_TLV, 228 NM_PENDING_TIMESTAMP_TLV, 229 NM_PSKC_TLV, 230 NM_SECURITY_POLICY_TLV 231 } <= set(p.thread_meshcop.tlv.type) 232 ).\ 233 must_next() 234 235 # Step 8: Commissioner sends a MGMT_PENDING_GET.req to Leader Anycast 236 # or Routing Locator: 237 # CoAP Request URI 238 # CON POST coap://<L>:MM/c/pg 239 # CoAP Payload 240 # PAN ID TLV 241 _mgmt_pending_get_pkt = pkts.filter_wpan_src64(COMMISSIONER).\ 242 filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\ 243 filter_coap_request(MGMT_PENDING_GET_URI).\ 244 filter(lambda p: { 245 NM_PAN_ID_TLV 246 } <= set(p.thread_meshcop.tlv.type) 247 ).\ 248 must_next() 249 250 # Step 9: Leader sends a MGMT_PENDING_GET.rsp to Commissioner with 251 # the following format: 252 # CoAP Response Code 253 # 2.04 Changed 254 # CoAP Payload 255 # PAN ID TLV 256 # Delay Timer TLV 257 pkts.filter_ipv6_src_dst(_mgmt_pending_get_pkt.ipv6.dst, COMMISSIONER_RLOC).\ 258 filter_coap_ack(MGMT_PENDING_GET_URI).\ 259 filter(lambda p: { 260 NM_DELAY_TIMER_TLV, 261 NM_PAN_ID_TLV 262 } <= set(p.coap.tlv.type) and\ 263 p.thread_meshcop.tlv.pan_id == [0xafce] and\ 264 p.thread_meshcop.tlv.delay_timer < 60000 265 ).\ 266 must_next() 267 268 # Step 11: Commissioner sends a MGMT_PENDING_GET.req to Leader Anycast 269 # or Routing Locator: 270 # CoAP Request URI 271 # CON POST coap://<L>:MM/c/pg 272 # CoAP Payload 273 # <empty> - get all Active Operational Dataset parameters 274 _mgmt_pending_get_pkt = pkts.filter_wpan_src64(COMMISSIONER).\ 275 filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\ 276 filter_coap_request(MGMT_PENDING_GET_URI).\ 277 filter(lambda p: p.coap.tlv.type is nullField).\ 278 must_next() 279 280 # Step 12: Leader sends a MGMT_PENDING_GET.rsp to Commissioner with 281 # the following format: 282 # CoAP Response Code 283 # 2.04 Changed 284 # CoAP Payload 285 # <empty> - (no Pending Operational Dataset) 286 pkts.filter_ipv6_src_dst(_mgmt_pending_get_pkt.ipv6.dst, COMMISSIONER_RLOC).\ 287 filter_coap_ack(MGMT_PENDING_GET_URI).\ 288 filter(lambda p: p.thread_meshcop.tlv.type is nullField).\ 289 must_next() 290 291 292if __name__ == '__main__': 293 unittest.main() 294