1# 2# Copyright 2015 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16"""Script for sending testing parameters and commands to a Bluetooth device. 17 18This script provides a simple shell interface for sending data at run-time to a 19Bluetooth device. It is intended to be used in tandem with the test vendor 20library project. 21 22Usage: 23 Option A: Script 24 1. Run build_and_run.sh in scripts/ with the --test-channel flag set and the 25 port to use for the test channel. 26 Option B: Manual 27 1. Choose a port to use for the test channel. Use 'adb forward tcp:<port> 28 tcp:<port>' to forward the port to the device. 29 2. In a separate shell, build and push the test vendor library to the device 30 using the script mentioned in option A (i.e. without the --test-channel flag 31 set). 32 3. Once logcat has started, turn Bluetooth on from the device. 33 4. Run this program, in the shell from step 1, the port, also from step 1, 34 as arguments. 35""" 36 37#!/usr/bin/env python3 38 39import cmd 40import random 41import socket 42import string 43import struct 44import sys 45import time 46 47DEVICE_NAME_LENGTH = 6 48DEVICE_ADDRESS_LENGTH = 6 49 50 51# Used to generate fake device names and addresses during discovery. 52def generate_random_name(): 53 return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \ 54 string.digits) for _ in range(DEVICE_NAME_LENGTH)) 55 56 57def generate_random_address(): 58 return ''.join(random.SystemRandom().choice(string.digits) for _ in \ 59 range(DEVICE_ADDRESS_LENGTH)) 60 61 62class Connection(object): 63 """Simple wrapper class for a socket object. 64 65 Attributes: 66 socket: The underlying socket created for the specified address and port. 67 """ 68 69 def __init__(self, port): 70 self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 71 self._socket.connect(('localhost', port)) 72 73 def close(self): 74 self._socket.close() 75 76 def send(self, data): 77 self._socket.sendall(data.encode()) 78 79 def receive(self, size): 80 return self._socket.recv(size) 81 82 83class TestChannel(object): 84 """Checks outgoing commands and sends them once verified. 85 86 Attributes: 87 connection: The connection to the test vendor library that commands are sent 88 on. 89 """ 90 91 def __init__(self, port): 92 self._connection = Connection(port) 93 self._closed = False 94 95 def close(self): 96 self._connection.close() 97 self._closed = True 98 99 def send_command(self, name, args): 100 name_size = len(name) 101 args_size = len(args) 102 self.lint_command(name, args, name_size, args_size) 103 encoded_name = chr(name_size) + name 104 encoded_args = chr(args_size) + ''.join(chr(len(arg)) + arg for arg in args) 105 command = encoded_name + encoded_args 106 if self._closed: 107 return 108 self._connection.send(command) 109 if name != 'CLOSE_TEST_CHANNEL': 110 print(self.receive_response().decode()) 111 112 def receive_response(self): 113 if self._closed: 114 return b'Closed' 115 size_chars = self._connection.receive(4) 116 size_bytes = bytearray(size_chars) 117 if not size_chars: 118 return b'No response, assuming that the connection is broken' 119 response_size = 0 120 for i in range(0, len(size_chars) - 1): 121 response_size |= (size_chars[i] << (8 * i)) 122 response = self._connection.receive(response_size) 123 return response 124 125 def lint_command(self, name, args, name_size, args_size): 126 assert name_size == len(name) and args_size == len(args) 127 try: 128 name.encode() 129 for arg in args: 130 arg.encode() 131 except UnicodeError: 132 print('Unrecognized characters.') 133 raise 134 if name_size > 255 or args_size > 255: 135 raise ValueError # Size must be encodable in one octet. 136 for arg in args: 137 if len(arg) > 255: 138 raise ValueError # Size must be encodable in one octet. 139 140 141class TestChannelShell(cmd.Cmd): 142 """Shell for sending test channel data to controller. 143 144 Manages the test channel to the controller and defines a set of commands the 145 user can send to the controller as well. These commands are processed parallel 146 to commands sent from the device stack and used to provide additional 147 debugging/testing capabilities. 148 149 Attributes: 150 test_channel: The communication channel to send data to the controller. 151 """ 152 153 def __init__(self, test_channel): 154 cmd.Cmd.__init__(self) 155 self._test_channel = test_channel 156 157 def do_add(self, args): 158 """Arguments: dev_type_str Add a new device of type dev_type_str. 159 160 """ 161 self._test_channel.send_command('add', args.split()) 162 163 def do_del(self, args): 164 """Arguments: device index Delete the device with the specified index. 165 166 """ 167 self._test_channel.send_command('del', args.split()) 168 169 def do_add_phy(self, args): 170 """Arguments: dev_type_str Add a new device of type dev_type_str. 171 172 """ 173 self._test_channel.send_command('add_phy', args.split()) 174 175 def do_del_phy(self, args): 176 """Arguments: phy index Delete the phy with the specified index. 177 178 """ 179 self._test_channel.send_command('del_phy', args.split()) 180 181 def do_add_device_to_phy(self, args): 182 """Arguments: device index phy index Add a new device of type dev_type_str. 183 184 """ 185 self._test_channel.send_command('add_device_to_phy', args.split()) 186 187 def do_del_device_from_phy(self, args): 188 """Arguments: phy index Delete the phy with the specified index. 189 190 """ 191 self._test_channel.send_command('del_device_from_phy', args.split()) 192 193 def do_add_remote(self, args): 194 """Arguments: dev_type_str Connect to a remote device at arg1@arg2. 195 196 """ 197 self._test_channel.send_command('add_remote', args.split()) 198 199 def do_get(self, args): 200 """Arguments: dev_num attr_str Get the value of the attribute attr_str from device dev_num. 201 202 """ 203 self._test_channel.send_command('get', args.split()) 204 205 def do_set(self, args): 206 """Arguments: dev_num attr_str val Set the value of the attribute attr_str from device dev_num equal to val. 207 208 """ 209 self._test_channel.send_command('set', args.split()) 210 211 def do_set_device_address(self, args): 212 """Arguments: dev_num addr Set the address of device dev_num equal to addr. 213 214 """ 215 self._test_channel.send_command('set_device_address', args.split()) 216 217 def do_list(self, args): 218 """Arguments: [dev_num [attr]] List the devices from the controller, optionally filtered by device and attr. 219 220 """ 221 self._test_channel.send_command('list', args.split()) 222 223 def do_set_timer_period(self, args): 224 """Arguments: period_ms Set the timer to fire every period_ms milliseconds 225 """ 226 self._test_channel.send_command('set_timer_period', args.split()) 227 228 def do_start_timer(self, args): 229 """Arguments: None. Start the timer. 230 """ 231 self._test_channel.send_command('start_timer', args.split()) 232 233 def do_stop_timer(self, args): 234 """Arguments: None. Stop the timer. 235 """ 236 self._test_channel.send_command('stop_timer', args.split()) 237 238 def do_wait(self, args): 239 """Arguments: time in seconds (float). 240 """ 241 sleep_time = float(args.split()[0]) 242 time.sleep(sleep_time) 243 244 def do_reset(self, args): 245 """Arguments: None. 246 247 Resets the simulation. 248 """ 249 self._test_channel.send_command('reset', []) 250 251 def do_end(self, args): 252 """Arguments: None. 253 254 Ends the simulation and exits. 255 """ 256 self._test_channel.send_command('END_SIMULATION', []) 257 print('Goodbye.') 258 return True 259 260 def do_quit(self, args): 261 """Arguments: None. 262 263 Exits the test channel. 264 """ 265 self._test_channel.send_command('CLOSE_TEST_CHANNEL', []) 266 self._test_channel.close() 267 print('Goodbye.') 268 return True 269 270 def do_help(self, args): 271 """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr. 272 273 """ 274 if (len(args) == 0): 275 cmd.Cmd.do_help(self, args) 276 else: 277 self._test_channel.send_command('help', args.split()) 278 279 def preloop(self): 280 """Clear out the buffer 281 282 """ 283 response = self._test_channel.receive_response() 284 285 #def postcmd(self, stop, line): 286 #""" 287 #Called after each command 288 #stop : whether we will stop after this command 289 #line : the previous input line 290 #Return True to stop, False to continue 291 #""" 292 #if stop: 293 #return True 294 #response = self._test_channel.receive_response() 295 #if not response: 296 #return True 297 #print response 298 #return False 299 300 301def main(argv): 302 if len(argv) != 2: 303 print('Usage: python test_channel.py [port]') 304 return 305 try: 306 port = int(argv[1]) 307 except ValueError: 308 print('Error parsing port.') 309 else: 310 try: 311 test_channel = TestChannel(port) 312 except socket.error as e: 313 print('Error connecting to socket: %s' % e) 314 except: 315 print('Error creating test channel (check argument).') 316 else: 317 test_channel_shell = TestChannelShell(test_channel) 318 test_channel_shell.prompt = '$ ' 319 test_channel_shell.cmdloop('Welcome to the RootCanal Console \n' + 'Type \'help\' for more information.') 320 321 322if __name__ == '__main__': 323 main(sys.argv) 324