• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""
2  Copyright (c) 2024, The OpenThread Authors.
3  All rights reserved.
4
5  Redistribution and use in source and binary forms, with or without
6  modification, are permitted provided that the following conditions are met:
7  1. Redistributions of source code must retain the above copyright
8     notice, this list of conditions and the following disclaimer.
9  2. Redistributions in binary form must reproduce the above copyright
10     notice, this list of conditions and the following disclaimer in the
11     documentation and/or other materials provided with the distribution.
12  3. Neither the name of the copyright holder nor the
13     names of its contributors may be used to endorse or promote products
14     derived from this software without specific prior written permission.
15
16  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
20  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  POSSIBILITY OF SUCH DAMAGE.
27"""
28
29from abc import abstractmethod
30from ble.ble_connection_constants import BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, \
31    BBTC_RX_CHAR_UUID
32from ble.ble_stream import BleStream
33from ble.ble_stream_secure import BleStreamSecure
34from ble import ble_scanner
35from tlv.tlv import TLV
36from tlv.tcat_tlv import TcatTLVType
37from cli.command import Command, CommandResultNone, CommandResultTLV
38from dataset.dataset import ThreadDataset
39from utils import select_device_by_user_input
40from os import path
41from time import time
42from secrets import token_bytes
43from hashlib import sha256
44import hmac
45import binascii
46
47CHALLENGE_SIZE = 8
48
49
50class HelpCommand(Command):
51
52    def get_help_string(self) -> str:
53        return 'Display help and return.'
54
55    async def execute_default(self, args, context):
56        commands = context['commands']
57        for name, command in commands.items():
58            print(f'{name}')
59            command.print_help(indent=1)
60        return CommandResultNone()
61
62
63class DataNotPrepared(Exception):
64    pass
65
66
67class BleCommand(Command):
68
69    @abstractmethod
70    def get_log_string(self) -> str:
71        pass
72
73    @abstractmethod
74    def prepare_data(self, args, context):
75        pass
76
77    async def execute_default(self, args, context):
78        if 'ble_sstream' not in context or context['ble_sstream'] is None:
79            print("TCAT Device not connected.")
80            return CommandResultNone()
81        bless: BleStreamSecure = context['ble_sstream']
82
83        print(self.get_log_string())
84        try:
85            data = self.prepare_data(args, context)
86            response = await bless.send_with_resp(data)
87            if not response:
88                return
89            tlv_response = TLV.from_bytes(response)
90            self.process_response(tlv_response, context)
91            return CommandResultTLV(tlv_response)
92        except DataNotPrepared as err:
93            print('Command failed', err)
94        return CommandResultNone()
95
96    def process_response(self, tlv_response, context):
97        pass
98
99
100class HelloCommand(BleCommand):
101
102    def get_log_string(self) -> str:
103        return 'Sending hello world...'
104
105    def get_help_string(self) -> str:
106        return 'Send round trip "Hello world!" message.'
107
108    def prepare_data(self, args, context):
109        return TLV(TcatTLVType.APPLICATION.value, bytes('Hello world!', 'ascii')).to_bytes()
110
111
112class CommissionCommand(BleCommand):
113
114    def get_log_string(self) -> str:
115        return 'Commissioning...'
116
117    def get_help_string(self) -> str:
118        return 'Update the connected device with current dataset.'
119
120    def prepare_data(self, args, context):
121        dataset: ThreadDataset = context['dataset']
122        dataset_bytes = dataset.to_bytes()
123        return TLV(TcatTLVType.ACTIVE_DATASET.value, dataset_bytes).to_bytes()
124
125
126class DecommissionCommand(BleCommand):
127
128    def get_log_string(self) -> str:
129        return 'Disabling Thread and decommissioning device...'
130
131    def get_help_string(self) -> str:
132        return 'Stop Thread interface and decommission device from current network.'
133
134    def prepare_data(self, args, context):
135        return TLV(TcatTLVType.DECOMMISSION.value, bytes()).to_bytes()
136
137
138class DisconnectCommand(Command):
139
140    def get_help_string(self) -> str:
141        return 'Disconnect client from TCAT device'
142
143    async def execute_default(self, args, context):
144        if 'ble_sstream' not in context or context['ble_sstream'] is None:
145            print("TCAT Device not connected.")
146            return CommandResultNone()
147        await context['ble_sstream'].close()
148        return CommandResultNone()
149
150
151class ExtractDatasetCommand(BleCommand):
152
153    def get_log_string(self) -> str:
154        return 'Getting active dataset.'
155
156    def get_help_string(self) -> str:
157        return 'Get active dataset from device.'
158
159    def prepare_data(self, args, context):
160        return TLV(TcatTLVType.GET_ACTIVE_DATASET.value, bytes()).to_bytes()
161
162    def process_response(self, tlv_response, context):
163        if tlv_response.type == TcatTLVType.RESPONSE_W_PAYLOAD.value:
164            dataset = ThreadDataset()
165            dataset.set_from_bytes(tlv_response.value)
166            dataset.print_content()
167        else:
168            print('Dataset extraction error.')
169
170
171class GetCommissionerCertificate(BleCommand):
172
173    def get_log_string(self) -> str:
174        return 'Getting commissioner certificate.'
175
176    def get_help_string(self) -> str:
177        return 'Get commissioner certificate from device.'
178
179    def prepare_data(self, args, context):
180        return TLV(TcatTLVType.GET_COMMISSIONER_CERTIFICATE.value, bytes()).to_bytes()
181
182
183class GetDeviceIdCommand(BleCommand):
184
185    def get_log_string(self) -> str:
186        return 'Retrieving device id.'
187
188    def get_help_string(self) -> str:
189        return 'Get unique identifier for the TCAT device.'
190
191    def prepare_data(self, args, context):
192        return TLV(TcatTLVType.GET_DEVICE_ID.value, bytes()).to_bytes()
193
194
195class GetExtPanIDCommand(BleCommand):
196
197    def get_log_string(self) -> str:
198        return 'Retrieving extended PAN ID.'
199
200    def get_help_string(self) -> str:
201        return 'Get extended PAN ID that is commissioned in the active dataset.'
202
203    def prepare_data(self, args, context):
204        return TLV(TcatTLVType.GET_EXT_PAN_ID.value, bytes()).to_bytes()
205
206
207class GetProvisioningUrlCommand(BleCommand):
208
209    def get_log_string(self) -> str:
210        return 'Retrieving provisioning url.'
211
212    def get_help_string(self) -> str:
213        return 'Get a URL for an application suited to commission the TCAT device.'
214
215    def prepare_data(self, args, context):
216        return TLV(TcatTLVType.GET_PROVISIONING_URL.value, bytes()).to_bytes()
217
218
219class GetNetworkNameCommand(BleCommand):
220
221    def get_log_string(self) -> str:
222        return 'Retrieving network name.'
223
224    def get_help_string(self) -> str:
225        return 'Get the Thread network name that is commissioned in the active dataset.'
226
227    def prepare_data(self, args, context):
228        return TLV(TcatTLVType.GET_NETWORK_NAME.value, bytes()).to_bytes()
229
230
231class GetPskdHash(BleCommand):
232
233    def get_log_string(self) -> str:
234        return 'Retrieving peer PSKd hash.'
235
236    def get_help_string(self) -> str:
237        return 'Get calculated PSKd hash.'
238
239    def prepare_data(self, args, context):
240        bless: BleStreamSecure = context['ble_sstream']
241        if bless.peer_public_key is None:
242            raise DataNotPrepared("Peer certificate not present.")
243
244        challenge = token_bytes(CHALLENGE_SIZE)
245        pskd = bytes(args[0], 'utf-8')
246
247        data = TLV(TcatTLVType.GET_PSKD_HASH.value, challenge).to_bytes()
248
249        hash = hmac.new(pskd, digestmod=sha256)
250        hash.update(challenge)
251        hash.update(bless.peer_public_key)
252        self.digest = hash.digest()
253        return data
254
255    def process_response(self, tlv_response, context):
256        if tlv_response.value == self.digest:
257            print('Requested hash is valid.')
258        else:
259            print('Requested hash is NOT valid.')
260
261
262class GetRandomNumberChallenge(BleCommand):
263
264    def get_log_string(self) -> str:
265        return 'Retrieving random challenge.'
266
267    def get_help_string(self) -> str:
268        return 'Get the device random number challenge.'
269
270    def prepare_data(self, args, context):
271        return TLV(TcatTLVType.GET_RANDOM_NUMBER_CHALLENGE.value, bytes()).to_bytes()
272
273    def process_response(self, tlv_response, context):
274        bless: BleStreamSecure = context['ble_sstream']
275        if tlv_response.value != None:
276            if len(tlv_response.value) == CHALLENGE_SIZE:
277                bless.peer_challenge = tlv_response.value
278            else:
279                print('Challenge format invalid.')
280                return CommandResultNone()
281
282
283class PingCommand(Command):
284
285    def get_help_string(self) -> str:
286        return 'Send echo request to TCAT device.'
287
288    async def execute_default(self, args, context):
289        bless: BleStreamSecure = context['ble_sstream']
290        payload_size = 10
291        max_payload = 512
292        if len(args) > 0:
293            payload_size = int(args[0])
294            if payload_size > max_payload:
295                print(f'Payload size too large. Maximum supported value is {max_payload}')
296                return CommandResultNone()
297        to_send = token_bytes(payload_size)
298        data = TLV(TcatTLVType.PING.value, to_send).to_bytes()
299        elapsed_time = time()
300        response = await bless.send_with_resp(data)
301        elapsed_time = 1e3 * (time() - elapsed_time)
302        if not response:
303            return CommandResultNone()
304
305        tlv_response = TLV.from_bytes(response)
306        if tlv_response.value != to_send:
307            print("Received malformed response.")
308
309        print(f"Roundtrip time: {elapsed_time} ms")
310
311        return CommandResultTLV(tlv_response)
312
313
314class PresentHash(BleCommand):
315
316    def get_log_string(self) -> str:
317        return 'Presenting hash.'
318
319    def get_help_string(self) -> str:
320        return 'Present calculated hash.'
321
322    def prepare_data(self, args, context):
323        type = args[0]
324        code = None
325        tlv_type = None
326        if type == "pskd":
327            code = bytes(args[1], 'utf-8')
328            tlv_type = TcatTLVType.PRESENT_PSKD_HASH.value
329        elif type == "pskc":
330            code = bytes.fromhex(args[1])
331            tlv_type = TcatTLVType.PRESENT_PSKC_HASH.value
332        elif type == "install":
333            code = bytes(args[1], 'utf-8')
334            tlv_type = TcatTLVType.PRESENT_INSTALL_CODE_HASH.value
335        else:
336            raise DataNotPrepared("Hash code name incorrect.")
337        bless: BleStreamSecure = context['ble_sstream']
338        if bless.peer_public_key is None:
339            raise DataNotPrepared("Peer certificate not present.")
340
341        if bless.peer_challenge is None:
342            raise DataNotPrepared("Peer challenge not present.")
343
344        hash = hmac.new(code, digestmod=sha256)
345        hash.update(bless.peer_challenge)
346        hash.update(bless.peer_public_key)
347
348        data = TLV(tlv_type, hash.digest()).to_bytes()
349        return data
350
351
352class ScanCommand(Command):
353
354    def get_help_string(self) -> str:
355        return 'Perform scan for TCAT devices.'
356
357    async def execute_default(self, args, context):
358        if 'ble_sstream' in context and context['ble_sstream'] is not None:
359            context['ble_sstream'].close()
360            del context['ble_sstream']
361
362        tcat_devices = await ble_scanner.scan_tcat_devices()
363        device = select_device_by_user_input(tcat_devices)
364
365        if device is None:
366            return CommandResultNone()
367
368        ble_sstream = None
369
370        print(f'Connecting to {device}')
371        ble_stream = await BleStream.create(device.address, BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, BBTC_RX_CHAR_UUID)
372        ble_sstream = BleStreamSecure(ble_stream)
373        cert_path = context['cmd_args'].cert_path if context['cmd_args'] else 'auth'
374        ble_sstream.load_cert(
375            certfile=path.join(cert_path, 'commissioner_cert.pem'),
376            keyfile=path.join(cert_path, 'commissioner_key.pem'),
377            cafile=path.join(cert_path, 'ca_cert.pem'),
378        )
379        print('Setting up secure channel...')
380        if await ble_sstream.do_handshake():
381            print('Done')
382            context['ble_sstream'] = ble_sstream
383        else:
384            print('Secure channel not established.')
385            await ble_stream.disconnect()
386        return CommandResultNone()
387
388
389class ThreadStartCommand(BleCommand):
390
391    def get_log_string(self) -> str:
392        return 'Enabling Thread...'
393
394    def get_help_string(self) -> str:
395        return 'Enable thread interface.'
396
397    def prepare_data(self, args, context):
398        return TLV(TcatTLVType.THREAD_START.value, bytes()).to_bytes()
399
400
401class ThreadStopCommand(BleCommand):
402
403    def get_log_string(self) -> str:
404        return 'Disabling Thread...'
405
406    def get_help_string(self) -> str:
407        return 'Disable thread interface.'
408
409    def prepare_data(self, args, context):
410        return TLV(TcatTLVType.THREAD_STOP.value, bytes()).to_bytes()
411
412
413class ThreadStateCommand(Command):
414
415    def __init__(self):
416        self._subcommands = {'start': ThreadStartCommand(), 'stop': ThreadStopCommand()}
417
418    def get_help_string(self) -> str:
419        return 'Manipulate state of the Thread interface of the connected device.'
420
421    async def execute_default(self, args, context):
422        print('Invalid usage. Provide a subcommand.')
423        return CommandResultNone()
424