• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""
2  Copyright (c) 2024, The OpenThread Authors.
3  All rights reserved.
4
5  Redistribution and use in source and binary forms, with or without
6  modification, are permitted provided that the following conditions are met:
7  1. Redistributions of source code must retain the above copyright
8     notice, this list of conditions and the following disclaimer.
9  2. Redistributions in binary form must reproduce the above copyright
10     notice, this list of conditions and the following disclaimer in the
11     documentation and/or other materials provided with the distribution.
12  3. Neither the name of the copyright holder nor the
13     names of its contributors may be used to endorse or promote products
14     derived from this software without specific prior written permission.
15
16  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
20  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  POSSIBILITY OF SUCH DAMAGE.
27"""
28
29from ble.ble_connection_constants import BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, \
30    BBTC_RX_CHAR_UUID, SERVER_COMMON_NAME
31from ble.ble_stream import BleStream
32from ble.ble_stream_secure import BleStreamSecure
33from ble import ble_scanner
34from tlv.tlv import TLV
35from tlv.tcat_tlv import TcatTLVType
36from cli.command import Command, CommandResultNone, CommandResultTLV
37from dataset.dataset import ThreadDataset
38from utils import select_device_by_user_input
39from os import path
40
41
42class HelpCommand(Command):
43
44    def get_help_string(self) -> str:
45        return 'Display help and return.'
46
47    async def execute_default(self, args, context):
48        commands = context['commands']
49        for name, command in commands.items():
50            print(f'{name}')
51            command.print_help(indent=1)
52        return CommandResultNone()
53
54
55class HelloCommand(Command):
56
57    def get_help_string(self) -> str:
58        return 'Send round trip "Hello world!" message.'
59
60    async def execute_default(self, args, context):
61        bless: BleStreamSecure = context['ble_sstream']
62        print('Sending hello world...')
63        data = TLV(TcatTLVType.APPLICATION.value, bytes('Hello world!', 'ascii')).to_bytes()
64        response = await bless.send_with_resp(data)
65        if not response:
66            return
67        tlv_response = TLV.from_bytes(response)
68        return CommandResultTLV(tlv_response)
69
70
71class CommissionCommand(Command):
72
73    def get_help_string(self) -> str:
74        return 'Update the connected device with current dataset.'
75
76    async def execute_default(self, args, context):
77        bless: BleStreamSecure = context['ble_sstream']
78        dataset: ThreadDataset = context['dataset']
79
80        print('Commissioning...')
81        dataset_bytes = dataset.to_bytes()
82        data = TLV(TcatTLVType.ACTIVE_DATASET.value, dataset_bytes).to_bytes()
83        response = await bless.send_with_resp(data)
84        if not response:
85            return
86        tlv_response = TLV.from_bytes(response)
87        return CommandResultTLV(tlv_response)
88
89
90class ThreadStartCommand(Command):
91
92    def get_help_string(self) -> str:
93        return 'Enable thread interface.'
94
95    async def execute_default(self, args, context):
96        bless: BleStreamSecure = context['ble_sstream']
97
98        print('Enabling Thread...')
99        data = TLV(TcatTLVType.THREAD_START.value, bytes()).to_bytes()
100        response = await bless.send_with_resp(data)
101        if not response:
102            return
103        tlv_response = TLV.from_bytes(response)
104        return CommandResultTLV(tlv_response)
105
106
107class ThreadStopCommand(Command):
108
109    def get_help_string(self) -> str:
110        return 'Disable thread interface.'
111
112    async def execute_default(self, args, context):
113        bless: BleStreamSecure = context['ble_sstream']
114        print('Disabling Thread...')
115        data = TLV(TcatTLVType.THREAD_STOP.value, bytes()).to_bytes()
116        response = await bless.send_with_resp(data)
117        if not response:
118            return
119        tlv_response = TLV.from_bytes(response)
120        return CommandResultTLV(tlv_response)
121
122
123class ThreadStateCommand(Command):
124
125    def __init__(self):
126        self._subcommands = {'start': ThreadStartCommand(), 'stop': ThreadStopCommand()}
127
128    def get_help_string(self) -> str:
129        return 'Manipulate state of the Thread interface of the connected device.'
130
131    async def execute_default(self, args, context):
132        print('Invalid usage. Provide a subcommand.')
133        return CommandResultNone()
134
135
136class ScanCommand(Command):
137
138    def get_help_string(self) -> str:
139        return 'Perform scan for TCAT devices.'
140
141    async def execute_default(self, args, context):
142        if not (context['ble_sstream'] is None):
143            del context['ble_sstream']
144
145        tcat_devices = await ble_scanner.scan_tcat_devices()
146        device = select_device_by_user_input(tcat_devices)
147
148        if device is None:
149            return CommandResultNone()
150
151        ble_sstream = None
152
153        print(f'Connecting to {device}')
154        ble_stream = await BleStream.create(device.address, BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, BBTC_RX_CHAR_UUID)
155        ble_sstream = BleStreamSecure(ble_stream)
156        ble_sstream.load_cert(
157            certfile=path.join('auth', 'commissioner_cert.pem'),
158            keyfile=path.join('auth', 'commissioner_key.pem'),
159            cafile=path.join('auth', 'ca_cert.pem'),
160        )
161
162        print('Setting up secure channel...')
163        await ble_sstream.do_handshake(hostname=SERVER_COMMON_NAME)
164        print('Done')
165        context['ble_sstream'] = ble_sstream
166