• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#  Copyright (c) 2020, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import unittest
31
32import config
33import thread_cert
34from pktverify.consts import MLE_DATA_RESPONSE, MGMT_COMMISSIONER_GET_URI, NM_CHANNEL_TLV, NM_COMMISSIONER_ID_TLV, NM_COMMISSIONER_SESSION_ID_TLV, NM_STEERING_DATA_TLV, NM_BORDER_AGENT_LOCATOR_TLV, LEADER_DATA_TLV, NETWORK_DATA_TLV, ACTIVE_TIMESTAMP_TLV, SOURCE_ADDRESS_TLV, NWD_COMMISSIONING_DATA_TLV, NM_PAN_ID_TLV, NM_NETWORK_NAME_TLV
35from pktverify.packet_verifier import PacketVerifier
36from pktverify.null_field import nullField
37
38COMMISSIONER = 1
39LEADER = 2
40
41# Test Purpose and Description:
42# -----------------------------
43# The purpose of this test case is to verify Leader's and active Commissioner's behavior via
44# MGMT_COMMISSIONER_GET request and response
45#
46# Test Topology:
47# -------------
48# Commissioner
49#    |
50#  Leader
51#
52# DUT Types:
53# ----------
54#  Leader
55#  Commissioner
56
57
58class Cert_9_2_01_MGMTCommissionerGet(thread_cert.TestCase):
59    SUPPORT_NCP = False
60    USE_MESSAGE_FACTORY = True
61
62    TOPOLOGY = {
63        COMMISSIONER: {
64            'name': 'COMMISSIONER',
65            'mode': 'rdn',
66            'allowlist': [LEADER]
67        },
68        LEADER: {
69            'name': 'LEADER',
70            'mode': 'rdn',
71            'allowlist': [COMMISSIONER]
72        },
73    }
74
75    def test(self):
76        self.nodes[LEADER].start()
77        self.simulator.go(config.LEADER_STARTUP_DELAY)
78        self.assertEqual(self.nodes[LEADER].get_state(), 'leader')
79
80        self.nodes[COMMISSIONER].start()
81        self.simulator.go(config.ROUTER_STARTUP_DELAY)
82        self.assertEqual(self.nodes[COMMISSIONER].get_state(), 'router')
83
84        self.collect_leader_aloc(LEADER)
85        self.collect_rlocs()
86        self.collect_rloc16s()
87
88        self.nodes[COMMISSIONER].commissioner_start()
89        self.simulator.go(3)
90
91        self.nodes[COMMISSIONER].commissioner_mgmtget()
92        self.simulator.go(5)
93
94        self.nodes[COMMISSIONER].commissioner_mgmtget('0b08')
95        self.simulator.go(5)
96
97        self.nodes[COMMISSIONER].commissioner_mgmtget('0b01')
98        self.simulator.go(5)
99
100        self.nodes[COMMISSIONER].commissioner_mgmtget('0903')
101        self.simulator.go(5)
102
103        leader_rloc = self.nodes[LEADER].get_rloc()
104        commissioner_rloc = self.nodes[COMMISSIONER].get_rloc()
105        self.assertTrue(self.nodes[COMMISSIONER].ping(leader_rloc))
106        self.simulator.go(1)
107        self.assertTrue(self.nodes[LEADER].ping(commissioner_rloc))
108
109    def verify(self, pv):
110        pkts = pv.pkts
111        pv.summary.show()
112
113        LEADER = pv.vars['LEADER']
114        LEADER_ALOC = pv.vars['LEADER_ALOC']
115        LEADER_RLOC = pv.vars['LEADER_RLOC']
116        COMMISSIONER = pv.vars['COMMISSIONER']
117        COMMISSIONER_RLOC = pv.vars['COMMISSIONER_RLOC']
118
119        # Step 1: Ensure topology is formed correctly
120        pv.verify_attached('COMMISSIONER', 'LEADER')
121
122        # Step 2: Commissioner sends a MGMT_COMMISSIONER_GET.req to Leader Anycast
123        #         or Routing Locator:
124        #         CoAP Request URI
125        #             CON POST coap://<L>:MM/c/cg
126        #         CoAP Payload
127        #             <empty> - get all Commissioner Dataset parameters
128        _pkt = pkts.filter_wpan_src64(COMMISSIONER).\
129            filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\
130            filter_coap_request(MGMT_COMMISSIONER_GET_URI).\
131            filter(lambda p: p.coap.tlv.type is nullField).\
132            must_next()
133
134        # Step 3: Leader sends a MGMT_COMMISSIONER_GET.rsp to Commissioner with
135        #         the following format:
136        #         CoAP Response Code
137        #             2.04 Changed
138        #         CoAP Payload
139        #             (entire Commissioner Dataset)
140        #             Border Agent Locator TLV
141        #             Commissioner Session ID TLV
142        #             Steering Data TLV
143        pkts.filter_ipv6_src_dst(_pkt.ipv6.dst, COMMISSIONER_RLOC).\
144            filter_coap_ack(MGMT_COMMISSIONER_GET_URI).\
145            filter(lambda p: {
146                              NM_COMMISSIONER_SESSION_ID_TLV,
147                              NM_BORDER_AGENT_LOCATOR_TLV,
148                              NM_STEERING_DATA_TLV
149                             } <= set(p.thread_meshcop.tlv.type) and\
150                   p.thread_meshcop.tlv.ba_locator is not nullField and\
151                   p.thread_meshcop.tlv.commissioner_sess_id is not nullField and\
152                   p.thread_meshcop.tlv.steering_data is not nullField
153                   ).\
154           must_next()
155
156        # Step 4: Commissioner sends a MGMT_COMMISSIONER_GET.req to Leader Anycast
157        #         or Routing Locator:
158        #         CoAP Request URI
159        #             CON POST coap://<L>:MM/c/cg
160        #         CoAP Payload
161        #             Commissioner Session ID TLV
162        #             Steering Data TLV
163        _mgmt_get_pkt = pkts.filter_wpan_src64(COMMISSIONER).\
164            filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\
165            filter_coap_request(MGMT_COMMISSIONER_GET_URI).\
166            filter(lambda p: {
167                              NM_COMMISSIONER_SESSION_ID_TLV,
168                              NM_STEERING_DATA_TLV
169                             } <= set(p.thread_meshcop.tlv.type)
170                   ).\
171           must_next()
172
173        # Step 5: Leader sends a MGMT_COMMISSIONER_GET.rsp to Commissioner with
174        #         the following format:
175        #         CoAP Response Code
176        #             2.04 Changed
177        #         CoAP Payload
178        #         Encoded values for the requested Commissioner Dataset parameters
179        #             Commissioner Session ID TLV
180        #             Steering Data TLV
181        pkts.filter_ipv6_src_dst(_mgmt_get_pkt.ipv6.dst, COMMISSIONER_RLOC).\
182            filter_coap_ack(MGMT_COMMISSIONER_GET_URI).\
183            filter(lambda p: {
184                              NM_COMMISSIONER_SESSION_ID_TLV,
185                              NM_STEERING_DATA_TLV
186                             } <= set(p.thread_meshcop.tlv.type) and\
187                   p.thread_meshcop.tlv.commissioner_sess_id is not nullField and\
188                   p.thread_meshcop.tlv.steering_data is not nullField
189                   ).\
190           must_next()
191
192        # Step 6: Commissioner sends a MGMT_COMMISSIONER_GET.req to Leader Anycast
193        #         or Routing Locator:
194        #         CoAP Request URI
195        #             CON POST coap://<L>:MM/c/cg
196        #         CoAP Payload
197        #             Commissioner Session ID TLV
198        #             PAN ID TLV
199        _mgmt_get_pkt = pkts.filter_wpan_src64(COMMISSIONER).\
200            filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\
201            filter_coap_request(MGMT_COMMISSIONER_GET_URI).\
202            filter(lambda p: {
203                              NM_COMMISSIONER_SESSION_ID_TLV,
204                              NM_PAN_ID_TLV
205                             } <= set(p.thread_meshcop.tlv.type)
206                   ).\
207           must_next()
208
209        # Step 7: Leader sends a MGMT_COMMISSIONER_GET.rsp to Commissioner with
210        #         the following format:
211        #         CoAP Response Code
212        #             2.04 Changed
213        #         CoAP Payload
214        #         Encoded values for the requested Commissioner Dataset parameters
215        #             Commissioner Session ID TLV
216        #             (PAN ID TLV in Get TLV is ignored)
217        pkts.filter_ipv6_src_dst(_mgmt_get_pkt.ipv6.dst, COMMISSIONER_RLOC).\
218            filter_coap_ack(MGMT_COMMISSIONER_GET_URI).\
219            filter(lambda p: {
220                              NM_COMMISSIONER_SESSION_ID_TLV
221                             } <= set(p.thread_meshcop.tlv.type) and\
222                   p.thread_meshcop.tlv.commissioner_sess_id is not nullField and\
223                   p.thread_meshcop.tlv.pan_id is nullField
224                   ).\
225           must_next()
226
227        # Step 8: Commissioner sends a MGMT_COMMISSIONER_GET.req to Leader Anycast
228        #         or Routing Locator:
229        #         CoAP Request URI
230        #             CON POST coap://<L>:MM/c/cg
231        #         CoAP Payload
232        #             Border Agent Locator TLV
233        #             Network Name TLV
234        _mgmt_get_pkt = pkts.filter_wpan_src64(COMMISSIONER).\
235            filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\
236            filter_coap_request(MGMT_COMMISSIONER_GET_URI).\
237            filter(lambda p: {
238                              NM_BORDER_AGENT_LOCATOR_TLV,
239                              NM_NETWORK_NAME_TLV
240                             } <= set(p.thread_meshcop.tlv.type)
241                   ).\
242           must_next()
243
244        # Step 9: Leader sends a MGMT_COMMISSIONER_GET.rsp to Commissioner with
245        #         the following format:
246        #         CoAP Response Code
247        #             2.04 Changed
248        #         CoAP Payload
249        #         Encoded values for the requested Commissioner Dataset parameters
250        #             Border Agent Locator TLV
251        #             (Network Name TLV in Get TLV is ignored)
252        pkts.filter_ipv6_src_dst(_mgmt_get_pkt.ipv6.dst, COMMISSIONER_RLOC).\
253            filter_coap_ack(MGMT_COMMISSIONER_GET_URI).\
254            filter(lambda p: {
255                              NM_BORDER_AGENT_LOCATOR_TLV
256                             } <= set(p.thread_meshcop.tlv.type) and\
257                   p.thread_meshcop.tlv.ba_locator is not nullField and\
258                   p.thread_meshcop.tlv.net_name is nullField
259                   ).\
260           must_next()
261
262        # Step 10: Verify connectivity by sending an ICMPv6 Echo Request to the DUT mesh local address
263        _pkt = pkts.filter_ping_request().\
264            filter_ipv6_src_dst(COMMISSIONER_RLOC, LEADER_RLOC).\
265            must_next()
266        pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\
267            filter_ipv6_src_dst(LEADER_RLOC, COMMISSIONER_RLOC).\
268            must_next()
269
270        _pkt = pkts.filter_ping_request().\
271            filter_ipv6_src_dst(LEADER_RLOC, COMMISSIONER_RLOC).\
272            must_next()
273        pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\
274            filter_ipv6_src_dst(COMMISSIONER_RLOC, LEADER_RLOC).\
275            must_next()
276
277
278if __name__ == '__main__':
279    unittest.main()
280