• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#  Copyright (c) 2020, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import unittest
31
32import config
33import mesh_cop
34import thread_cert
35from pktverify.consts import MGMT_PENDING_GET_URI, MGMT_PENDING_SET_URI, NM_CHANNEL_TLV, NM_PAN_ID_TLV, NM_NETWORK_NAME_TLV, NM_NETWORK_MESH_LOCAL_PREFIX_TLV, NM_PSKC_TLV, NM_ACTIVE_TIMESTAMP_TLV, NM_CHANNEL_MASK_TLV, NM_EXTENDED_PAN_ID_TLV, NM_NETWORK_KEY_TLV, NM_SECURITY_POLICY_TLV, NM_PENDING_TIMESTAMP_TLV, NM_DELAY_TIMER_TLV, LEADER_ALOC
36from pktverify.packet_verifier import PacketVerifier
37from pktverify.null_field import nullField
38
39COMMISSIONER = 1
40LEADER = 2
41
42# Test Purpose and Description:
43# -----------------------------
44# The purpose of this test case is to verify Leader's and active Commissioner's behavior via
45# MGMT_PENDING_GET request and response
46#
47# Test Topology:
48# -------------
49# Commissioner
50#    |
51#  Leader
52#
53# DUT Types:
54# ----------
55#  Leader
56#  Commissioner
57
58
59class Cert_9_2_19_PendingDatasetGet(thread_cert.TestCase):
60    SUPPORT_NCP = False
61
62    TOPOLOGY = {
63        COMMISSIONER: {
64            'name': 'COMMISSIONER',
65            'mode': 'rdn',
66            'allowlist': [LEADER]
67        },
68        LEADER: {
69            'name': 'LEADER',
70            'mode': 'rdn',
71            'allowlist': [COMMISSIONER]
72        },
73    }
74
75    def test(self):
76        self.nodes[LEADER].start()
77        self.simulator.go(config.LEADER_STARTUP_DELAY)
78        self.assertEqual(self.nodes[LEADER].get_state(), 'leader')
79
80        self.nodes[COMMISSIONER].start()
81        self.simulator.go(config.ROUTER_STARTUP_DELAY)
82        self.assertEqual(self.nodes[COMMISSIONER].get_state(), 'router')
83        self.simulator.get_messages_sent_by(LEADER)
84
85        self.collect_rlocs()
86        self.collect_rloc16s()
87
88        leader_rloc = self.nodes[LEADER].get_rloc()
89
90        self.nodes[COMMISSIONER].commissioner_start()
91        self.simulator.go(3)
92
93        self.nodes[COMMISSIONER].send_mgmt_pending_get()
94        self.simulator.go(2)
95
96        self.nodes[COMMISSIONER].send_mgmt_pending_set(
97            active_timestamp=60,
98            delay_timer=60000,
99            panid=0xAFCE,
100            pending_timestamp=30,
101        )
102        self.simulator.go(2)
103
104        self.nodes[COMMISSIONER].send_mgmt_pending_get()
105        self.simulator.go(2)
106
107        self.nodes[COMMISSIONER].send_mgmt_pending_get(leader_rloc, [mesh_cop.TlvType.PAN_ID])
108        self.simulator.go(92)
109
110        self.nodes[COMMISSIONER].send_mgmt_pending_get()
111        self.simulator.go(2)
112
113    def verify(self, pv):
114        pkts = pv.pkts
115        pv.summary.show()
116
117        LEADER = pv.vars['LEADER']
118        LEADER_RLOC = pv.vars['LEADER_RLOC']
119        COMMISSIONER = pv.vars['COMMISSIONER']
120        COMMISSIONER_RLOC = pv.vars['COMMISSIONER_RLOC']
121
122        # Step 1: Ensure topology is formed correctly
123        pv.verify_attached('COMMISSIONER', 'LEADER')
124
125        # Step 2: Commissioner sends a MGMT_PENDING_GET.req to Leader Anycast
126        #         or Routing Locator:
127        #         CoAP Request URI
128        #             CON POST coap://<L>:MM/c/pg
129        #         CoAP Payload
130        #             <empty> - get all Active Operational Dataset parameters
131        _mgmt_get_pkt = pkts.filter_wpan_src64(COMMISSIONER).\
132            filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\
133            filter_coap_request(MGMT_PENDING_GET_URI).\
134            filter(lambda p: p.coap.tlv.type is nullField).\
135            must_next()
136
137        # Step 3: Leader sends a MGMT_PENDING_GET.rsp to Commissioner with
138        #         the following format:
139        #         CoAP Response Code
140        #             2.04 Changed
141        #         CoAP Payload
142        #             <empty> - (no Pending Operational Dataset)
143        pkts.filter_ipv6_src_dst(_mgmt_get_pkt.ipv6.dst, COMMISSIONER_RLOC).\
144            filter_coap_ack(MGMT_PENDING_GET_URI).\
145            filter(lambda p: p.coap.tlv.type is nullField).\
146            must_next()
147
148        # Step 4: Commissioner sends a MGMT_PENDING_SET.req to Leader Anycast
149        #         or Routing Locator:
150        #         CoAP Request URI
151        #             CON POST coap://<L>:MM/c/ps
152        #         CoAP Payload
153        #             Active Timestamp TLV: 60s
154        #             Commissioner Session ID TLV (valid)
155        #             Delay Timer TLV: 1 minute
156        #             Pending Timestamp TLV: 30s
157        #             PAN ID TLV: 0xAFCE (new value)
158        _mgmt_pending_set_pkt = pkts.filter_wpan_src64(COMMISSIONER).\
159            filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\
160            filter_coap_request(MGMT_PENDING_SET_URI).\
161            filter(lambda p: {
162                              NM_ACTIVE_TIMESTAMP_TLV,
163                              NM_PENDING_TIMESTAMP_TLV,
164                              NM_DELAY_TIMER_TLV,
165                              NM_PAN_ID_TLV
166                             } <= set(p.thread_meshcop.tlv.type) and\
167                   p.thread_meshcop.tlv.active_tstamp == 60 and\
168                   p.thread_meshcop.tlv.pan_id == [0xafce] and\
169                   p.thread_meshcop.tlv.delay_timer == 60000
170                   ).\
171           must_next()
172
173        # Step 5: Leader sends a MGMT_PENDING_SET.rsp to Commissioner with
174        #         the following format:
175        #         CoAP Response Code
176        #             2.04 Changed
177        #         CoAP Payload
178        #             State TLV (value = Accept (01))
179        pkts.filter_ipv6_src_dst(_mgmt_pending_set_pkt.ipv6.dst, COMMISSIONER_RLOC).\
180            filter_coap_ack(MGMT_PENDING_SET_URI).\
181            filter(lambda p:
182                   p.thread_meshcop.tlv.state == 1
183                   ).\
184           must_next()
185
186        # Step 6: Commissioner sends a MGMT_PENDING_GET.req to Leader Anycast
187        #         or Routing Locator:
188        #         CoAP Request URI
189        #             CON POST coap://<L>:MM/c/pg
190        #         CoAP Payload
191        #             <empty> - get all Active Operational Dataset parameters
192        _mgmt_pending_get_pkt = pkts.filter_wpan_src64(COMMISSIONER).\
193            filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\
194            filter_coap_request(MGMT_PENDING_GET_URI).\
195            filter(lambda p: p.coap.tlv.type is nullField).\
196            must_next()
197
198        # Step 7: Leader sends a MGMT_PENDING_GET.rsp to Commissioner with
199        #         the following format:
200        #         CoAP Response Code
201        #             2.04 Changed
202        #         CoAP Payload
203        #             (entire Active Operational Dataset)
204        #             Active Timestamp TLV
205        #             Channel TLV
206        #             Channel Mask TLV
207        #             Delay Timer TLV
208        #             Extended PAN ID TLV
209        #             Network Mesh-Local Prefix TLV
210        #             Network Key TLV
211        #             Network Name TLV
212        #             PAN ID TLV
213        #             Pending Timestamp TLV
214        #             PSKc TLV
215        #             Security Policy TLV
216        pkts.filter_ipv6_src_dst(_mgmt_pending_get_pkt.ipv6.dst, COMMISSIONER_RLOC).\
217            filter_coap_ack(MGMT_PENDING_GET_URI).\
218            filter(lambda p: {
219                              NM_ACTIVE_TIMESTAMP_TLV,
220                              NM_CHANNEL_TLV,
221                              NM_CHANNEL_MASK_TLV,
222                              NM_DELAY_TIMER_TLV,
223                              NM_EXTENDED_PAN_ID_TLV,
224                              NM_NETWORK_MESH_LOCAL_PREFIX_TLV,
225                              NM_NETWORK_KEY_TLV,
226                              NM_NETWORK_NAME_TLV,
227                              NM_PAN_ID_TLV,
228                              NM_PENDING_TIMESTAMP_TLV,
229                              NM_PSKC_TLV,
230                              NM_SECURITY_POLICY_TLV
231                             } <= set(p.thread_meshcop.tlv.type)
232                   ).\
233           must_next()
234
235        # Step 8: Commissioner sends a MGMT_PENDING_GET.req to Leader Anycast
236        #         or Routing Locator:
237        #         CoAP Request URI
238        #             CON POST coap://<L>:MM/c/pg
239        #         CoAP Payload
240        #             PAN ID TLV
241        _mgmt_pending_get_pkt = pkts.filter_wpan_src64(COMMISSIONER).\
242            filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\
243            filter_coap_request(MGMT_PENDING_GET_URI).\
244            filter(lambda p: {
245                              NM_PAN_ID_TLV
246                             } <= set(p.thread_meshcop.tlv.type)
247                   ).\
248           must_next()
249
250        # Step 9: Leader sends a MGMT_PENDING_GET.rsp to Commissioner with
251        #         the following format:
252        #         CoAP Response Code
253        #             2.04 Changed
254        #         CoAP Payload
255        #             PAN ID TLV
256        #             Delay Timer TLV
257        pkts.filter_ipv6_src_dst(_mgmt_pending_get_pkt.ipv6.dst, COMMISSIONER_RLOC).\
258            filter_coap_ack(MGMT_PENDING_GET_URI).\
259            filter(lambda p: {
260                              NM_DELAY_TIMER_TLV,
261                              NM_PAN_ID_TLV
262                             } <= set(p.coap.tlv.type) and\
263                   p.thread_meshcop.tlv.pan_id == [0xafce] and\
264                   p.thread_meshcop.tlv.delay_timer < 60000
265                   ).\
266           must_next()
267
268        # Step 11: Commissioner sends a MGMT_PENDING_GET.req to Leader Anycast
269        #         or Routing Locator:
270        #         CoAP Request URI
271        #             CON POST coap://<L>:MM/c/pg
272        #         CoAP Payload
273        #             <empty> - get all Active Operational Dataset parameters
274        _mgmt_pending_get_pkt = pkts.filter_wpan_src64(COMMISSIONER).\
275            filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\
276            filter_coap_request(MGMT_PENDING_GET_URI).\
277            filter(lambda p: p.coap.tlv.type is nullField).\
278            must_next()
279
280        # Step 12: Leader sends a MGMT_PENDING_GET.rsp to Commissioner with
281        #         the following format:
282        #         CoAP Response Code
283        #             2.04 Changed
284        #         CoAP Payload
285        #             <empty> - (no Pending Operational Dataset)
286        pkts.filter_ipv6_src_dst(_mgmt_pending_get_pkt.ipv6.dst, COMMISSIONER_RLOC).\
287            filter_coap_ack(MGMT_PENDING_GET_URI).\
288            filter(lambda p: p.thread_meshcop.tlv.type is nullField).\
289           must_next()
290
291
292if __name__ == '__main__':
293    unittest.main()
294