1""" 2 Copyright (c) 2024, The OpenThread Authors. 3 All rights reserved. 4 5 Redistribution and use in source and binary forms, with or without 6 modification, are permitted provided that the following conditions are met: 7 1. Redistributions of source code must retain the above copyright 8 notice, this list of conditions and the following disclaimer. 9 2. Redistributions in binary form must reproduce the above copyright 10 notice, this list of conditions and the following disclaimer in the 11 documentation and/or other materials provided with the distribution. 12 3. Neither the name of the copyright holder nor the 13 names of its contributors may be used to endorse or promote products 14 derived from this software without specific prior written permission. 15 16 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 17 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 20 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 21 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 22 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 23 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 24 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 25 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26 POSSIBILITY OF SUCH DAMAGE. 27""" 28 29from abc import abstractmethod 30from ble.ble_connection_constants import BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, \ 31 BBTC_RX_CHAR_UUID 32from ble.ble_stream import BleStream 33from ble.ble_stream_secure import BleStreamSecure 34from ble import ble_scanner 35from tlv.tlv import TLV 36from tlv.tcat_tlv import TcatTLVType 37from cli.command import Command, CommandResultNone, CommandResultTLV 38from dataset.dataset import ThreadDataset 39from utils import select_device_by_user_input 40from os import path 41from time import time 42from secrets import token_bytes 43from hashlib import sha256 44import hmac 45import binascii 46 47CHALLENGE_SIZE = 8 48 49 50class HelpCommand(Command): 51 52 def get_help_string(self) -> str: 53 return 'Display help and return.' 54 55 async def execute_default(self, args, context): 56 commands = context['commands'] 57 for name, command in commands.items(): 58 print(f'{name}') 59 command.print_help(indent=1) 60 return CommandResultNone() 61 62 63class DataNotPrepared(Exception): 64 pass 65 66 67class BleCommand(Command): 68 69 @abstractmethod 70 def get_log_string(self) -> str: 71 pass 72 73 @abstractmethod 74 def prepare_data(self, args, context): 75 pass 76 77 async def execute_default(self, args, context): 78 if 'ble_sstream' not in context or context['ble_sstream'] is None: 79 print("TCAT Device not connected.") 80 return CommandResultNone() 81 bless: BleStreamSecure = context['ble_sstream'] 82 83 print(self.get_log_string()) 84 try: 85 data = self.prepare_data(args, context) 86 response = await bless.send_with_resp(data) 87 if not response: 88 return 89 tlv_response = TLV.from_bytes(response) 90 self.process_response(tlv_response, context) 91 return CommandResultTLV(tlv_response) 92 except DataNotPrepared as err: 93 print('Command failed', err) 94 return CommandResultNone() 95 96 def process_response(self, tlv_response, context): 97 pass 98 99 100class HelloCommand(BleCommand): 101 102 def get_log_string(self) -> str: 103 return 'Sending hello world...' 104 105 def get_help_string(self) -> str: 106 return 'Send round trip "Hello world!" message.' 107 108 def prepare_data(self, args, context): 109 return TLV(TcatTLVType.APPLICATION.value, bytes('Hello world!', 'ascii')).to_bytes() 110 111 112class CommissionCommand(BleCommand): 113 114 def get_log_string(self) -> str: 115 return 'Commissioning...' 116 117 def get_help_string(self) -> str: 118 return 'Update the connected device with current dataset.' 119 120 def prepare_data(self, args, context): 121 dataset: ThreadDataset = context['dataset'] 122 dataset_bytes = dataset.to_bytes() 123 return TLV(TcatTLVType.ACTIVE_DATASET.value, dataset_bytes).to_bytes() 124 125 126class DecommissionCommand(BleCommand): 127 128 def get_log_string(self) -> str: 129 return 'Disabling Thread and decommissioning device...' 130 131 def get_help_string(self) -> str: 132 return 'Stop Thread interface and decommission device from current network.' 133 134 def prepare_data(self, args, context): 135 return TLV(TcatTLVType.DECOMMISSION.value, bytes()).to_bytes() 136 137 138class DisconnectCommand(Command): 139 140 def get_help_string(self) -> str: 141 return 'Disconnect client from TCAT device' 142 143 async def execute_default(self, args, context): 144 if 'ble_sstream' not in context or context['ble_sstream'] is None: 145 print("TCAT Device not connected.") 146 return CommandResultNone() 147 await context['ble_sstream'].close() 148 return CommandResultNone() 149 150 151class ExtractDatasetCommand(BleCommand): 152 153 def get_log_string(self) -> str: 154 return 'Getting active dataset.' 155 156 def get_help_string(self) -> str: 157 return 'Get active dataset from device.' 158 159 def prepare_data(self, args, context): 160 return TLV(TcatTLVType.GET_ACTIVE_DATASET.value, bytes()).to_bytes() 161 162 def process_response(self, tlv_response, context): 163 if tlv_response.type == TcatTLVType.RESPONSE_W_PAYLOAD.value: 164 dataset = ThreadDataset() 165 dataset.set_from_bytes(tlv_response.value) 166 dataset.print_content() 167 else: 168 print('Dataset extraction error.') 169 170 171class GetCommissionerCertificate(BleCommand): 172 173 def get_log_string(self) -> str: 174 return 'Getting commissioner certificate.' 175 176 def get_help_string(self) -> str: 177 return 'Get commissioner certificate from device.' 178 179 def prepare_data(self, args, context): 180 return TLV(TcatTLVType.GET_COMMISSIONER_CERTIFICATE.value, bytes()).to_bytes() 181 182 183class GetDeviceIdCommand(BleCommand): 184 185 def get_log_string(self) -> str: 186 return 'Retrieving device id.' 187 188 def get_help_string(self) -> str: 189 return 'Get unique identifier for the TCAT device.' 190 191 def prepare_data(self, args, context): 192 return TLV(TcatTLVType.GET_DEVICE_ID.value, bytes()).to_bytes() 193 194 195class GetExtPanIDCommand(BleCommand): 196 197 def get_log_string(self) -> str: 198 return 'Retrieving extended PAN ID.' 199 200 def get_help_string(self) -> str: 201 return 'Get extended PAN ID that is commissioned in the active dataset.' 202 203 def prepare_data(self, args, context): 204 return TLV(TcatTLVType.GET_EXT_PAN_ID.value, bytes()).to_bytes() 205 206 207class GetProvisioningUrlCommand(BleCommand): 208 209 def get_log_string(self) -> str: 210 return 'Retrieving provisioning url.' 211 212 def get_help_string(self) -> str: 213 return 'Get a URL for an application suited to commission the TCAT device.' 214 215 def prepare_data(self, args, context): 216 return TLV(TcatTLVType.GET_PROVISIONING_URL.value, bytes()).to_bytes() 217 218 219class GetNetworkNameCommand(BleCommand): 220 221 def get_log_string(self) -> str: 222 return 'Retrieving network name.' 223 224 def get_help_string(self) -> str: 225 return 'Get the Thread network name that is commissioned in the active dataset.' 226 227 def prepare_data(self, args, context): 228 return TLV(TcatTLVType.GET_NETWORK_NAME.value, bytes()).to_bytes() 229 230 231class GetPskdHash(BleCommand): 232 233 def get_log_string(self) -> str: 234 return 'Retrieving peer PSKd hash.' 235 236 def get_help_string(self) -> str: 237 return 'Get calculated PSKd hash.' 238 239 def prepare_data(self, args, context): 240 bless: BleStreamSecure = context['ble_sstream'] 241 if bless.peer_public_key is None: 242 raise DataNotPrepared("Peer certificate not present.") 243 244 challenge = token_bytes(CHALLENGE_SIZE) 245 pskd = bytes(args[0], 'utf-8') 246 247 data = TLV(TcatTLVType.GET_PSKD_HASH.value, challenge).to_bytes() 248 249 hash = hmac.new(pskd, digestmod=sha256) 250 hash.update(challenge) 251 hash.update(bless.peer_public_key) 252 self.digest = hash.digest() 253 return data 254 255 def process_response(self, tlv_response, context): 256 if tlv_response.value == self.digest: 257 print('Requested hash is valid.') 258 else: 259 print('Requested hash is NOT valid.') 260 261 262class GetRandomNumberChallenge(BleCommand): 263 264 def get_log_string(self) -> str: 265 return 'Retrieving random challenge.' 266 267 def get_help_string(self) -> str: 268 return 'Get the device random number challenge.' 269 270 def prepare_data(self, args, context): 271 return TLV(TcatTLVType.GET_RANDOM_NUMBER_CHALLENGE.value, bytes()).to_bytes() 272 273 def process_response(self, tlv_response, context): 274 bless: BleStreamSecure = context['ble_sstream'] 275 if tlv_response.value != None: 276 if len(tlv_response.value) == CHALLENGE_SIZE: 277 bless.peer_challenge = tlv_response.value 278 else: 279 print('Challenge format invalid.') 280 return CommandResultNone() 281 282 283class PingCommand(Command): 284 285 def get_help_string(self) -> str: 286 return 'Send echo request to TCAT device.' 287 288 async def execute_default(self, args, context): 289 bless: BleStreamSecure = context['ble_sstream'] 290 payload_size = 10 291 max_payload = 512 292 if len(args) > 0: 293 payload_size = int(args[0]) 294 if payload_size > max_payload: 295 print(f'Payload size too large. Maximum supported value is {max_payload}') 296 return CommandResultNone() 297 to_send = token_bytes(payload_size) 298 data = TLV(TcatTLVType.PING.value, to_send).to_bytes() 299 elapsed_time = time() 300 response = await bless.send_with_resp(data) 301 elapsed_time = 1e3 * (time() - elapsed_time) 302 if not response: 303 return CommandResultNone() 304 305 tlv_response = TLV.from_bytes(response) 306 if tlv_response.value != to_send: 307 print("Received malformed response.") 308 309 print(f"Roundtrip time: {elapsed_time} ms") 310 311 return CommandResultTLV(tlv_response) 312 313 314class PresentHash(BleCommand): 315 316 def get_log_string(self) -> str: 317 return 'Presenting hash.' 318 319 def get_help_string(self) -> str: 320 return 'Present calculated hash.' 321 322 def prepare_data(self, args, context): 323 type = args[0] 324 code = None 325 tlv_type = None 326 if type == "pskd": 327 code = bytes(args[1], 'utf-8') 328 tlv_type = TcatTLVType.PRESENT_PSKD_HASH.value 329 elif type == "pskc": 330 code = bytes.fromhex(args[1]) 331 tlv_type = TcatTLVType.PRESENT_PSKC_HASH.value 332 elif type == "install": 333 code = bytes(args[1], 'utf-8') 334 tlv_type = TcatTLVType.PRESENT_INSTALL_CODE_HASH.value 335 else: 336 raise DataNotPrepared("Hash code name incorrect.") 337 bless: BleStreamSecure = context['ble_sstream'] 338 if bless.peer_public_key is None: 339 raise DataNotPrepared("Peer certificate not present.") 340 341 if bless.peer_challenge is None: 342 raise DataNotPrepared("Peer challenge not present.") 343 344 hash = hmac.new(code, digestmod=sha256) 345 hash.update(bless.peer_challenge) 346 hash.update(bless.peer_public_key) 347 348 data = TLV(tlv_type, hash.digest()).to_bytes() 349 return data 350 351 352class ScanCommand(Command): 353 354 def get_help_string(self) -> str: 355 return 'Perform scan for TCAT devices.' 356 357 async def execute_default(self, args, context): 358 if 'ble_sstream' in context and context['ble_sstream'] is not None: 359 context['ble_sstream'].close() 360 del context['ble_sstream'] 361 362 tcat_devices = await ble_scanner.scan_tcat_devices() 363 device = select_device_by_user_input(tcat_devices) 364 365 if device is None: 366 return CommandResultNone() 367 368 ble_sstream = None 369 370 print(f'Connecting to {device}') 371 ble_stream = await BleStream.create(device.address, BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, BBTC_RX_CHAR_UUID) 372 ble_sstream = BleStreamSecure(ble_stream) 373 cert_path = context['cmd_args'].cert_path if context['cmd_args'] else 'auth' 374 ble_sstream.load_cert( 375 certfile=path.join(cert_path, 'commissioner_cert.pem'), 376 keyfile=path.join(cert_path, 'commissioner_key.pem'), 377 cafile=path.join(cert_path, 'ca_cert.pem'), 378 ) 379 print('Setting up secure channel...') 380 if await ble_sstream.do_handshake(): 381 print('Done') 382 context['ble_sstream'] = ble_sstream 383 else: 384 print('Secure channel not established.') 385 await ble_stream.disconnect() 386 return CommandResultNone() 387 388 389class ThreadStartCommand(BleCommand): 390 391 def get_log_string(self) -> str: 392 return 'Enabling Thread...' 393 394 def get_help_string(self) -> str: 395 return 'Enable thread interface.' 396 397 def prepare_data(self, args, context): 398 return TLV(TcatTLVType.THREAD_START.value, bytes()).to_bytes() 399 400 401class ThreadStopCommand(BleCommand): 402 403 def get_log_string(self) -> str: 404 return 'Disabling Thread...' 405 406 def get_help_string(self) -> str: 407 return 'Disable thread interface.' 408 409 def prepare_data(self, args, context): 410 return TLV(TcatTLVType.THREAD_STOP.value, bytes()).to_bytes() 411 412 413class ThreadStateCommand(Command): 414 415 def __init__(self): 416 self._subcommands = {'start': ThreadStartCommand(), 'stop': ThreadStopCommand()} 417 418 def get_help_string(self) -> str: 419 return 'Manipulate state of the Thread interface of the connected device.' 420 421 async def execute_default(self, args, context): 422 print('Invalid usage. Provide a subcommand.') 423 return CommandResultNone() 424