1""" 2 Copyright (c) 2024, The OpenThread Authors. 3 All rights reserved. 4 5 Redistribution and use in source and binary forms, with or without 6 modification, are permitted provided that the following conditions are met: 7 1. Redistributions of source code must retain the above copyright 8 notice, this list of conditions and the following disclaimer. 9 2. Redistributions in binary form must reproduce the above copyright 10 notice, this list of conditions and the following disclaimer in the 11 documentation and/or other materials provided with the distribution. 12 3. Neither the name of the copyright holder nor the 13 names of its contributors may be used to endorse or promote products 14 derived from this software without specific prior written permission. 15 16 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 17 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 20 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 21 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 22 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 23 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 24 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 25 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26 POSSIBILITY OF SUCH DAMAGE. 27""" 28 29from ble.ble_connection_constants import BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, \ 30 BBTC_RX_CHAR_UUID, SERVER_COMMON_NAME 31from ble.ble_stream import BleStream 32from ble.ble_stream_secure import BleStreamSecure 33from ble import ble_scanner 34from tlv.tlv import TLV 35from tlv.tcat_tlv import TcatTLVType 36from cli.command import Command, CommandResultNone, CommandResultTLV 37from dataset.dataset import ThreadDataset 38from utils import select_device_by_user_input 39from os import path 40 41 42class HelpCommand(Command): 43 44 def get_help_string(self) -> str: 45 return 'Display help and return.' 46 47 async def execute_default(self, args, context): 48 commands = context['commands'] 49 for name, command in commands.items(): 50 print(f'{name}') 51 command.print_help(indent=1) 52 return CommandResultNone() 53 54 55class HelloCommand(Command): 56 57 def get_help_string(self) -> str: 58 return 'Send round trip "Hello world!" message.' 59 60 async def execute_default(self, args, context): 61 bless: BleStreamSecure = context['ble_sstream'] 62 print('Sending hello world...') 63 data = TLV(TcatTLVType.APPLICATION.value, bytes('Hello world!', 'ascii')).to_bytes() 64 response = await bless.send_with_resp(data) 65 if not response: 66 return 67 tlv_response = TLV.from_bytes(response) 68 return CommandResultTLV(tlv_response) 69 70 71class CommissionCommand(Command): 72 73 def get_help_string(self) -> str: 74 return 'Update the connected device with current dataset.' 75 76 async def execute_default(self, args, context): 77 bless: BleStreamSecure = context['ble_sstream'] 78 dataset: ThreadDataset = context['dataset'] 79 80 print('Commissioning...') 81 dataset_bytes = dataset.to_bytes() 82 data = TLV(TcatTLVType.ACTIVE_DATASET.value, dataset_bytes).to_bytes() 83 response = await bless.send_with_resp(data) 84 if not response: 85 return 86 tlv_response = TLV.from_bytes(response) 87 return CommandResultTLV(tlv_response) 88 89 90class ThreadStartCommand(Command): 91 92 def get_help_string(self) -> str: 93 return 'Enable thread interface.' 94 95 async def execute_default(self, args, context): 96 bless: BleStreamSecure = context['ble_sstream'] 97 98 print('Enabling Thread...') 99 data = TLV(TcatTLVType.THREAD_START.value, bytes()).to_bytes() 100 response = await bless.send_with_resp(data) 101 if not response: 102 return 103 tlv_response = TLV.from_bytes(response) 104 return CommandResultTLV(tlv_response) 105 106 107class ThreadStopCommand(Command): 108 109 def get_help_string(self) -> str: 110 return 'Disable thread interface.' 111 112 async def execute_default(self, args, context): 113 bless: BleStreamSecure = context['ble_sstream'] 114 print('Disabling Thread...') 115 data = TLV(TcatTLVType.THREAD_STOP.value, bytes()).to_bytes() 116 response = await bless.send_with_resp(data) 117 if not response: 118 return 119 tlv_response = TLV.from_bytes(response) 120 return CommandResultTLV(tlv_response) 121 122 123class ThreadStateCommand(Command): 124 125 def __init__(self): 126 self._subcommands = {'start': ThreadStartCommand(), 'stop': ThreadStopCommand()} 127 128 def get_help_string(self) -> str: 129 return 'Manipulate state of the Thread interface of the connected device.' 130 131 async def execute_default(self, args, context): 132 print('Invalid usage. Provide a subcommand.') 133 return CommandResultNone() 134 135 136class ScanCommand(Command): 137 138 def get_help_string(self) -> str: 139 return 'Perform scan for TCAT devices.' 140 141 async def execute_default(self, args, context): 142 if not (context['ble_sstream'] is None): 143 del context['ble_sstream'] 144 145 tcat_devices = await ble_scanner.scan_tcat_devices() 146 device = select_device_by_user_input(tcat_devices) 147 148 if device is None: 149 return CommandResultNone() 150 151 ble_sstream = None 152 153 print(f'Connecting to {device}') 154 ble_stream = await BleStream.create(device.address, BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, BBTC_RX_CHAR_UUID) 155 ble_sstream = BleStreamSecure(ble_stream) 156 ble_sstream.load_cert( 157 certfile=path.join('auth', 'commissioner_cert.pem'), 158 keyfile=path.join('auth', 'commissioner_key.pem'), 159 cafile=path.join('auth', 'ca_cert.pem'), 160 ) 161 162 print('Setting up secure channel...') 163 await ble_sstream.do_handshake(hostname=SERVER_COMMON_NAME) 164 print('Done') 165 context['ble_sstream'] = ble_sstream 166