1#!/usr/bin/env python3 2# 3# Copyright (c) 2016, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import unittest 31 32import config 33import thread_cert 34from pktverify.consts import MLE_CHILD_ID_REQUEST, MGMT_PANID_QUERY, MGMT_PANID_CONFLICT, MGMT_ED_REPORT, NM_COMMISSIONER_SESSION_ID_TLV, NM_CHANNEL_MASK_TLV, NM_PAN_ID_TLV, REALM_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS 35from pktverify.packet_verifier import PacketVerifier 36 37COMMISSIONER = 1 38LEADER1 = 2 39ROUTER1 = 3 40LEADER2 = 4 41 42 43class Cert_9_2_14_PanIdQuery(thread_cert.TestCase): 44 SUPPORT_NCP = False 45 46 TOPOLOGY = { 47 COMMISSIONER: { 48 'name': 'COMMISSIONER', 49 'mode': 'rdn', 50 'allowlist': [LEADER1] 51 }, 52 LEADER1: { 53 'name': 'LEADER_1', 54 'mode': 'rdn', 55 'allowlist': [COMMISSIONER, ROUTER1] 56 }, 57 ROUTER1: { 58 'name': 'ROUTER', 59 'mode': 'rdn', 60 'allowlist': [LEADER1, LEADER2] 61 }, 62 LEADER2: { 63 'name': 'LEADER_2', 64 'mode': 'rdn', 65 'panid': 0xdead, 66 'allowlist': [ROUTER1] 67 }, 68 } 69 70 def test(self): 71 self.nodes[LEADER1].start() 72 self.simulator.go(config.LEADER_STARTUP_DELAY) 73 self.assertEqual(self.nodes[LEADER1].get_state(), 'leader') 74 75 self.nodes[COMMISSIONER].start() 76 self.simulator.go(config.ROUTER_STARTUP_DELAY) 77 self.assertEqual(self.nodes[COMMISSIONER].get_state(), 'router') 78 self.nodes[COMMISSIONER].commissioner_start() 79 self.simulator.go(3) 80 81 self.nodes[ROUTER1].start() 82 self.simulator.go(config.ROUTER_STARTUP_DELAY) 83 self.assertEqual(self.nodes[ROUTER1].get_state(), 'router') 84 85 self.nodes[LEADER2].start() 86 self.simulator.go(config.LEADER_STARTUP_DELAY) 87 self.assertEqual(self.nodes[LEADER2].get_state(), 'leader') 88 89 self.collect_rlocs() 90 ipaddrs = self.nodes[ROUTER1].get_addrs() 91 for ipaddr in ipaddrs: 92 if ipaddr[0:4] != 'fe80': 93 break 94 95 self.nodes[COMMISSIONER].panid_query(0xdead, 0xffffffff, ipaddr) 96 97 self.nodes[COMMISSIONER].panid_query(0xdead, 0xffffffff, 'ff33:0040:fd00:db8:0:0:0:1') 98 99 self.assertTrue(self.nodes[COMMISSIONER].ping(ipaddr)) 100 101 def verify(self, pv): 102 pkts = pv.pkts 103 pv.summary.show() 104 105 ROUTER = pv.vars['ROUTER'] 106 COMMISSIONER = pv.vars['COMMISSIONER'] 107 ROUTER_RLOC = pv.vars['ROUTER_RLOC'] 108 COMMISSIONER_RLOC = pv.vars['COMMISSIONER_RLOC'] 109 _rpkts = pkts.filter_wpan_src64(ROUTER) 110 _cpkts = pkts.filter_wpan_src64(COMMISSIONER) 111 112 # Step 1: Ensure the topology is formed correctly 113 _rpkts.filter_mle_cmd(MLE_CHILD_ID_REQUEST).must_next() 114 115 # Step 2: Commissioner MUST send a unicast MGMT_PANID_QUERY.qry unicast to Router_1 116 _cpkts.filter_ipv6_dst(ROUTER_RLOC).filter_coap_request(MGMT_PANID_QUERY).must_next().must_verify( 117 lambda p: {NM_COMMISSIONER_SESSION_ID_TLV, NM_CHANNEL_MASK_TLV, NM_PAN_ID_TLV} <= set(p.thread_meshcop.tlv. 118 type)) 119 120 # Step 3: Router MUST send MGMT_ED_REPORT.ans to the Commissioner 121 _rpkts.range( 122 _cpkts.index).filter_ipv6_dst(COMMISSIONER_RLOC).filter_coap_request(MGMT_PANID_CONFLICT).must_next( 123 ).must_verify(lambda p: {NM_CHANNEL_MASK_TLV, NM_PAN_ID_TLV} <= set(p.thread_meshcop.tlv.type)) 124 125 # Step 4: Commissioner MUST send a multicast MGMT_PANID_QUERY.qry 126 _cpkts.filter_ipv6_dst(REALM_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS).filter_coap_request( 127 MGMT_PANID_QUERY).must_next().must_verify( 128 lambda p: {NM_COMMISSIONER_SESSION_ID_TLV, NM_CHANNEL_MASK_TLV, NM_PAN_ID_TLV} <= set(p.thread_meshcop. 129 tlv.type)) 130 131 # Step 5: Router MUST send MGMT_PANID_CONFLICT.ans to the Commissioner 132 _rpkts.range( 133 _cpkts.index).filter_ipv6_dst(COMMISSIONER_RLOC).filter_coap_request(MGMT_PANID_CONFLICT).must_next( 134 ).must_verify(lambda p: {NM_CHANNEL_MASK_TLV, NM_PAN_ID_TLV} <= set(p.thread_meshcop.tlv.type)) 135 136 # Step 6: Router MUST respond with an ICMPv6 Echo Reply 137 _rpkts.filter_ipv6_dst(COMMISSIONER_RLOC).filter_ping_reply().must_next() 138 139 140if __name__ == '__main__': 141 unittest.main() 142