1#!/usr/bin/env python3 2# 3# Copyright (c) 2016, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import unittest 31 32import config 33import thread_cert 34from pktverify.consts import MLE_CHILD_ID_REQUEST, MLE_PARENT_REQUEST, MLE_CHILD_ID_RESPONSE, MLE_ANNOUNCE, CHANNEL_TLV, PAN_ID_TLV, ACTIVE_TIMESTAMP_TLV, LINK_LOCAL_ALL_NODES_MULTICAST_ADDRESS 35from pktverify.packet_verifier import PacketVerifier 36 37LEADER1 = 1 38ROUTER1 = 2 39LEADER2 = 3 40MED = 4 41 42DATASET1_TIMESTAMP = 20 43DATASET1_CHANNEL = 11 44DATASET1_PANID = 0xface 45 46DATASET2_TIMESTAMP = 10 47DATASET2_CHANNEL = 12 48DATASET2_PANID = 0xafce 49 50 51class Cert_9_2_12_Announce(thread_cert.TestCase): 52 SUPPORT_NCP = False 53 54 TOPOLOGY = { 55 LEADER1: { 56 'name': 'LEADER_1', 57 'active_dataset': { 58 'timestamp': DATASET1_TIMESTAMP, 59 'panid': DATASET1_PANID, 60 'channel': DATASET1_CHANNEL 61 }, 62 'mode': 'rdn', 63 'allowlist': [ROUTER1] 64 }, 65 ROUTER1: { 66 'name': 'ROUTER_1', 67 'active_dataset': { 68 'timestamp': DATASET1_TIMESTAMP, 69 'panid': DATASET1_PANID, 70 'channel': DATASET1_CHANNEL 71 }, 72 'mode': 'rdn', 73 'allowlist': [LEADER1, LEADER2] 74 }, 75 LEADER2: { 76 'name': 'LEADER_2', 77 'active_dataset': { 78 'timestamp': DATASET2_TIMESTAMP, 79 'panid': DATASET2_PANID, 80 'channel': DATASET2_CHANNEL 81 }, 82 'mode': 'rdn', 83 'allowlist': [MED, ROUTER1] 84 }, 85 MED: { 86 'name': 'MED', 87 'channel': DATASET2_CHANNEL, 88 'is_mtd': True, 89 'mode': 'rn', 90 'panid': DATASET2_PANID, 91 'allowlist': [LEADER2] 92 }, 93 } 94 95 def test(self): 96 self.nodes[LEADER1].start() 97 self.simulator.go(config.LEADER_STARTUP_DELAY) 98 self.assertEqual(self.nodes[LEADER1].get_state(), 'leader') 99 self.nodes[LEADER1].commissioner_start() 100 self.simulator.go(3) 101 102 self.nodes[ROUTER1].start() 103 self.simulator.go(config.ROUTER_STARTUP_DELAY) 104 self.assertEqual(self.nodes[ROUTER1].get_state(), 'router') 105 106 self.nodes[LEADER2].start() 107 self.nodes[LEADER2].set_state('leader') 108 self.assertEqual(self.nodes[LEADER2].get_state(), 'leader') 109 110 self.nodes[MED].start() 111 self.simulator.go(5) 112 self.assertEqual(self.nodes[MED].get_state(), 'child') 113 114 ipaddrs = self.nodes[ROUTER1].get_addrs() 115 for ipaddr in ipaddrs: 116 if ipaddr[0:4] != 'fe80': 117 break 118 119 self.nodes[LEADER1].announce_begin(0x1000, 1, 1000, ipaddr) 120 self.simulator.go(30) 121 self.assertEqual(self.nodes[LEADER2].get_state(), 'router') 122 self.assertEqual(self.nodes[MED].get_state(), 'child') 123 124 self.collect_rlocs() 125 ipaddrs = self.nodes[MED].get_addrs() 126 for ipaddr in ipaddrs: 127 if ipaddr[0:4] != 'fe80': 128 self.assertTrue(self.nodes[LEADER1].ping(ipaddr)) 129 130 def verify(self, pv): 131 pkts = pv.pkts 132 pv.summary.show() 133 134 LEADER_1 = pv.vars['LEADER_1'] 135 ROUTER_1 = pv.vars['ROUTER_1'] 136 LEADER_2 = pv.vars['LEADER_2'] 137 LEADER_1_RLOC = pv.vars['LEADER_1_RLOC'] 138 MED = pv.vars['MED'] 139 MED_RLOC = pv.vars['MED_RLOC'] 140 141 # Step 1: Ensure the topology is formed correctly 142 pkts.filter_wpan_src64(LEADER_1).filter_wpan_dst64(ROUTER_1).filter_mle_cmd( 143 MLE_CHILD_ID_RESPONSE).must_next().must_verify(lambda p: p.wpan.dst_pan == DATASET1_PANID) 144 pkts.copy().filter_wpan_src64(LEADER_2).filter_wpan_dst64(MED).filter_mle_cmd( 145 MLE_CHILD_ID_RESPONSE).must_next().must_verify(lambda p: p.wpan.dst_pan == DATASET2_PANID) 146 147 # Step 4: Leader_2 MUST send a MLE Child ID Request on its new channel to Router_1 148 # LEADER_2 MUST send a MLE Announce Message 149 # The Destination PAN ID (0xFFFF) in the IEEE 802.15.4 MAC and MUST be secured using Key ID Mode 2. 150 pkts.filter_wpan_src64(LEADER_2).filter_wpan_dst64(ROUTER_1).filter_mle_cmd( 151 MLE_CHILD_ID_REQUEST).must_next().must_verify(lambda p: p.wpan.dst_pan == DATASET1_PANID) 152 153 pkts.filter_wpan_src64(LEADER_2).filter_ipv6_dst(LINK_LOCAL_ALL_NODES_MULTICAST_ADDRESS).filter_mle_cmd( 154 MLE_ANNOUNCE).must_next().must_verify( 155 lambda p: {CHANNEL_TLV, PAN_ID_TLV, ACTIVE_TIMESTAMP_TLV} == set(p.mle.tlv.type) and p.wpan.dst_pan == 156 0xffff and p.wpan.aux_sec.key_id_mode == 0x2 and p.wpan.aux_sec.key_source == 0x00000000ffffffff) 157 158 # Step 5: MED MUST send a MLE Child ID Request on its new channel 159 # MED MUST send a MLE Announce Message 160 # The Destination PAN ID (0xFFFF) in the IEEE 802.15.4 MAC and MUST be secured using Key ID Mode 2. 161 pkts.filter_wpan_src64(MED).filter_wpan_dst64(LEADER_2).filter_mle_cmd( 162 MLE_CHILD_ID_REQUEST).must_next().must_verify(lambda p: p.wpan.dst_pan == DATASET1_PANID) 163 164 pkts.filter_wpan_src64(MED).filter_ipv6_dst(LINK_LOCAL_ALL_NODES_MULTICAST_ADDRESS).filter_mle_cmd( 165 MLE_ANNOUNCE).must_next().must_verify( 166 lambda p: {CHANNEL_TLV, PAN_ID_TLV, ACTIVE_TIMESTAMP_TLV} == set(p.mle.tlv.type) and p.wpan.dst_pan == 167 0xffff and p.wpan.aux_sec.key_id_mode == 0x2 and p.wpan.aux_sec.key_source == 0x00000000ffffffff) 168 169 # Step 6: MED MUST respond with an ICMPv6 Echo Reply 170 pkts.filter_ping_reply().filter_ipv6_src_dst(MED_RLOC, LEADER_1_RLOC).must_next() 171 172 173if __name__ == '__main__': 174 unittest.main() 175