• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#  Copyright (c) 2020, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import unittest
31
32import command
33import config
34from mesh_cop import MeshCopState
35import thread_cert
36from pktverify.consts import MLE_CHILD_ID_RESPONSE, MLE_DISCOVERY_RESPONSE, HANDSHAKE_CLIENT_HELLO, HANDSHAKE_HELLO_VERIFY_REQUEST, HANDSHAKE_SERVER_HELLO, HANDSHAKE_SERVER_KEY_EXCHANGE, HANDSHAKE_SERVER_HELLO_DONE, HANDSHAKE_CLIENT_KEY_EXCHANGE, CONTENT_CHANGE_CIPHER_SPEC, CONTENT_HANDSHAKE, CONTENT_APPLICATION_DATA, NM_EXTENDED_PAN_ID_TLV, NM_NETWORK_NAME_TLV, NM_STEERING_DATA_TLV, NM_COMMISSIONER_UDP_PORT_TLV, NM_JOINER_UDP_PORT_TLV, NM_DISCOVERY_RESPONSE_TLV, NM_JOINER_DTLS_ENCAPSULATION_TLV, NM_JOINER_UDP_PORT_TLV, NM_JOINER_IID_TLV, NM_JOINER_ROUTER_LOCATOR_TLV, NM_JOINER_ROUTER_KEK_TLV, RLY_RX_URI, RLY_TX_URI
37from pktverify.packet_verifier import PacketVerifier
38
39COMMISSIONER = 1
40JOINER_ROUTER = 2
41JOINER = 3
42PSKD = 'PSKD01'
43URL_1 = 'www.openthread.org'
44URL_2 = 'www.wrongurl.org'
45
46# Test Purpose and Description:
47# -----------------------------
48# The purpose of this test case is to verify that the DUT sends and receives
49# relayed DTLS traffic correctly, when the wrong commissioning application is
50# requested by the Joiner.
51# It also verifies that the DUT (as a Joiner Router) sends JOIN_ENT.ntf encrypted
52# with KEK.
53#
54# Test Topology:
55# -------------
56#   Commissioner
57#       |
58# Joiner Router
59#       |
60#    Joiner
61#
62# DUT Types:
63# ----------
64#  Commissioner
65#  Joiner Router
66
67
68class Cert_8_2_05_JoinerRouter(thread_cert.TestCase):
69    SUPPORT_NCP = False
70
71    TOPOLOGY = {
72        COMMISSIONER: {
73            'name': 'COMMISSIONER',
74            'networkkey': '00112233445566778899aabbccddeeff',
75            'mode': 'rdn',
76        },
77        JOINER_ROUTER: {
78            'name': 'JOINER_ROUTER',
79            'networkkey': 'deadbeefdeadbeefdeadbeefdeadbeef',
80            'mode': 'rdn',
81        },
82        JOINER: {
83            'name': 'JOINER',
84            'networkkey': 'deadbeefdeadbeefdeadbeefdeadbeef',
85            'mode': 'rdn',
86        },
87    }
88
89    def test(self):
90        self.nodes[COMMISSIONER].interface_up()
91        self.nodes[COMMISSIONER].thread_start()
92        self.simulator.go(config.LEADER_STARTUP_DELAY)
93        self.assertEqual(self.nodes[COMMISSIONER].get_state(), 'leader')
94
95        self.nodes[COMMISSIONER].commissioner_set_provisioning_url(URL_1)
96        self.nodes[COMMISSIONER].commissioner_start()
97        self.simulator.go(5)
98        self.nodes[COMMISSIONER].commissioner_add_joiner(self.nodes[JOINER_ROUTER].get_eui64(), PSKD)
99        self.nodes[COMMISSIONER].commissioner_add_joiner(self.nodes[JOINER].get_eui64(), PSKD)
100        self.simulator.go(5)
101
102        self.nodes[JOINER_ROUTER].interface_up()
103        self.nodes[JOINER_ROUTER].joiner_start(PSKD, URL_1)
104        self.simulator.go(10)
105        self.assertEqual(
106            self.nodes[JOINER_ROUTER].get_networkkey(),
107            self.nodes[COMMISSIONER].get_networkkey(),
108        )
109
110        self.nodes[JOINER_ROUTER].thread_start()
111        self.simulator.go(5)
112        self.assertEqual(self.nodes[JOINER_ROUTER].get_state(), 'router')
113
114        self.nodes[COMMISSIONER].enable_allowlist()
115        self.nodes[COMMISSIONER].add_allowlist(self.nodes[JOINER_ROUTER].get_addr64())
116
117        self.nodes[JOINER].enable_allowlist()
118        self.nodes[JOINER].add_allowlist(self.nodes[JOINER_ROUTER].get_addr64())
119
120        self.nodes[JOINER].interface_up()
121        self.nodes[JOINER].joiner_start(PSKD, URL_2)
122        self.simulator.go(10)
123
124        self.simulator.read_cert_messages_in_commissioning_log([COMMISSIONER, JOINER_ROUTER])
125        commissioner_messages = self.simulator.get_messages_sent_by(COMMISSIONER)
126
127        self.assertEqual(
128            self.nodes[JOINER].get_networkkey(),
129            self.nodes[COMMISSIONER].get_networkkey(),
130        )
131        # check commissioner sends JOIN_FIN.rsp with reject
132        command.check_commissioner_commissioning_messages(commissioner_messages.commissioning_messages,
133                                                          MeshCopState.REJECT)
134        self.collect_rloc16s()
135        self.collect_ipaddrs()
136
137        self.nodes[JOINER].thread_start()
138        self.simulator.go(5)
139        self.assertEqual(self.nodes[JOINER].get_state(), 'router')
140
141    def verify(self, pv):
142        pkts = pv.pkts
143        pv.summary.show()
144
145        COMMISSIONER = pv.vars['COMMISSIONER']
146        JOINER_ROUTER_RLOC16 = pv.vars["JOINER_ROUTER_RLOC16"]
147        COMMISSIONER_RLOC16 = pv.vars["COMMISSIONER_RLOC16"]
148        JOINER_ROUTER_LLA = pv.vars["JOINER_ROUTER_LLA"]
149
150        # verify Joiner Router attaches to Commissioner
151        pkts.filter_wpan_src64(COMMISSIONER).\
152            filter_ipv6_dst(JOINER_ROUTER_LLA).\
153            filter_mle_cmd(MLE_CHILD_ID_RESPONSE).\
154            must_next()
155
156        _rs_pkt = pkts.filter_mle_cmd(MLE_DISCOVERY_RESPONSE).\
157            filter(lambda p: {
158                              NM_EXTENDED_PAN_ID_TLV,
159                              NM_NETWORK_NAME_TLV,
160                              NM_STEERING_DATA_TLV,
161                              NM_COMMISSIONER_UDP_PORT_TLV,
162                              NM_JOINER_UDP_PORT_TLV,
163                              NM_DISCOVERY_RESPONSE_TLV
164                            } <= set(p.thread_meshcop.tlv.type)
165                  ).\
166            must_next()
167
168        # Step 3: Verify that the following details occur in the exchange between the
169        #         Joiner, the Joiner_Router and the Commissioner
170
171        # 1. UDP port (Specified by Commissioner in Discovery Response) is used as
172        #    destination port for UDP datagrams from Joiner to Commissioner.
173
174        # 2. Joiner sends an initial DTLS-ClientHello handshake record to the
175        #    Joiner Router
176        _ch_pkt = pkts.filter_ipv6_dst(JOINER_ROUTER_LLA).\
177            filter(lambda p:
178                   p.dtls.handshake.type == [HANDSHAKE_CLIENT_HELLO] and\
179                   p.udp.srcport in _rs_pkt.thread_meshcop.tlv.udp_port and\
180                   p.udp.dstport in _rs_pkt.thread_meshcop.tlv.udp_port
181                   ).\
182            must_next()
183
184        # 3. The Joiner_Router receives the initial DTLS-ClientHello handshake record and
185        #    sends a RLY_RX.ntf message to the Commissioner containing:
186        #    CoAP URI-Path
187        #        NON POST coap://<C>/c/rx
188        #    CoAP Payload
189        #        Joiner DTLS Encapsulation TLV
190        #        Joiner UDP Port TLV
191        #        Joiner IID TLV
192        #        Joiner_Router Locator TLV
193        _pkt = pkts.filter_coap_request(RLY_RX_URI).\
194            filter_wpan_src16(JOINER_ROUTER_RLOC16).\
195            filter_wpan_dst16(COMMISSIONER_RLOC16).\
196            filter(lambda p: {
197                              NM_JOINER_DTLS_ENCAPSULATION_TLV,
198                              NM_JOINER_UDP_PORT_TLV,
199                              NM_JOINER_IID_TLV,
200                              NM_JOINER_ROUTER_LOCATOR_TLV
201                             } <= set(p.thread_meshcop.tlv.type) and\
202                   p.thread_meshcop.tlv.udp_port == [_ch_pkt.udp.dstport] and\
203                   p.thread_meshcop.tlv.jr_locator == JOINER_ROUTER_RLOC16
204                   ).\
205             must_next()
206
207        # 4. The Commissioner receives the RLY_RX.ntf message and sends a RLY_TX.ntf message to
208        #    the Joiner_Router containing:
209        #    CoAP URI-Path
210        #        NON POST coap://<JR>/c/tx
211        #    CoAP Payload
212        #        Joiner DTLS Encapsulation TLV
213        #        Joiner UDP Port TLV
214        #        Joiner IID TLV
215        #        Joiner_Router Locator TLV
216        #        Joiner_Router KEK TLV
217        pkts.filter_coap_request(RLY_TX_URI).\
218            filter_wpan_src16(COMMISSIONER_RLOC16).\
219            filter_wpan_dst16(JOINER_ROUTER_RLOC16).\
220            filter(lambda p: {
221                              NM_JOINER_DTLS_ENCAPSULATION_TLV,
222                              NM_JOINER_UDP_PORT_TLV,
223                              NM_JOINER_IID_TLV,
224                              NM_JOINER_ROUTER_LOCATOR_TLV,
225                              NM_JOINER_ROUTER_KEK_TLV
226                             } <= set(p.thread_meshcop.tlv.type) and\
227                   p.thread_meshcop.tlv.udp_port == [_ch_pkt.udp.dstport] and\
228                   p.thread_meshcop.tlv.jr_locator == JOINER_ROUTER_RLOC16
229                   ).\
230             must_next()
231
232
233if __name__ == '__main__':
234    unittest.main()
235