# # Copyright 2015 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # """Script for sending testing parameters and commands to a Bluetooth device. This script provides a simple shell interface for sending data at run-time to a Bluetooth device. It is intended to be used in tandem with the test vendor library project. Usage: Option A: Script 1. Run build_and_run.sh in scripts/ with the --test-channel flag set and the port to use for the test channel. Option B: Manual 1. Choose a port to use for the test channel. Use 'adb forward tcp: tcp:' to forward the port to the device. 2. In a separate shell, build and push the test vendor library to the device using the script mentioned in option A (i.e. without the --test-channel flag set). 3. Once logcat has started, turn Bluetooth on from the device. 4. Run this program, in the shell from step 1, the port, also from step 1, as arguments. """ #!/usr/bin/env python3 import cmd import random import socket import string import struct import sys import time DEVICE_NAME_LENGTH = 6 DEVICE_ADDRESS_LENGTH = 6 # Used to generate fake device names and addresses during discovery. def generate_random_name(): return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \ string.digits) for _ in range(DEVICE_NAME_LENGTH)) def generate_random_address(): return ''.join(random.SystemRandom().choice(string.digits) for _ in \ range(DEVICE_ADDRESS_LENGTH)) class Connection(object): """Simple wrapper class for a socket object. Attributes: socket: The underlying socket created for the specified address and port. """ def __init__(self, port): self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._socket.connect(('localhost', port)) def close(self): self._socket.close() def send(self, data): self._socket.sendall(data.encode()) def receive(self, size): return self._socket.recv(size) class TestChannel(object): """Checks outgoing commands and sends them once verified. Attributes: connection: The connection to the test vendor library that commands are sent on. """ def __init__(self, port): self._connection = Connection(port) self._closed = False def close(self): self._connection.close() self._closed = True def send_command(self, name, args): name_size = len(name) args_size = len(args) self.lint_command(name, args, name_size, args_size) encoded_name = chr(name_size) + name encoded_args = chr(args_size) + ''.join(chr(len(arg)) + arg for arg in args) command = encoded_name + encoded_args if self._closed: return self._connection.send(command) if name != 'CLOSE_TEST_CHANNEL': print(self.receive_response().decode()) def receive_response(self): if self._closed: return b'Closed' size_chars = self._connection.receive(4) size_bytes = bytearray(size_chars) if not size_chars: return b'No response, assuming that the connection is broken' response_size = 0 for i in range(0, len(size_chars) - 1): response_size |= (size_chars[i] << (8 * i)) response = self._connection.receive(response_size) return response def lint_command(self, name, args, name_size, args_size): assert name_size == len(name) and args_size == len(args) try: name.encode() for arg in args: arg.encode() except UnicodeError: print('Unrecognized characters.') raise if name_size > 255 or args_size > 255: raise ValueError # Size must be encodable in one octet. for arg in args: if len(arg) > 255: raise ValueError # Size must be encodable in one octet. class TestChannelShell(cmd.Cmd): """Shell for sending test channel data to controller. Manages the test channel to the controller and defines a set of commands the user can send to the controller as well. These commands are processed parallel to commands sent from the device stack and used to provide additional debugging/testing capabilities. Attributes: test_channel: The communication channel to send data to the controller. """ def __init__(self, test_channel): cmd.Cmd.__init__(self) self._test_channel = test_channel def do_add(self, args): """Arguments: dev_type_str Add a new device of type dev_type_str. """ self._test_channel.send_command('add', args.split()) def do_del(self, args): """Arguments: device index Delete the device with the specified index. """ self._test_channel.send_command('del', args.split()) def do_add_phy(self, args): """Arguments: dev_type_str Add a new device of type dev_type_str. """ self._test_channel.send_command('add_phy', args.split()) def do_del_phy(self, args): """Arguments: phy index Delete the phy with the specified index. """ self._test_channel.send_command('del_phy', args.split()) def do_add_device_to_phy(self, args): """Arguments: device index phy index Add a new device of type dev_type_str. """ self._test_channel.send_command('add_device_to_phy', args.split()) def do_del_device_from_phy(self, args): """Arguments: phy index Delete the phy with the specified index. """ self._test_channel.send_command('del_device_from_phy', args.split()) def do_add_remote(self, args): """Arguments: dev_type_str Connect to a remote device at arg1@arg2. """ self._test_channel.send_command('add_remote', args.split()) def do_get(self, args): """Arguments: dev_num attr_str Get the value of the attribute attr_str from device dev_num. """ self._test_channel.send_command('get', args.split()) def do_set(self, args): """Arguments: dev_num attr_str val Set the value of the attribute attr_str from device dev_num equal to val. """ self._test_channel.send_command('set', args.split()) def do_set_device_address(self, args): """Arguments: dev_num addr Set the address of device dev_num equal to addr. """ self._test_channel.send_command('set_device_address', args.split()) def do_list(self, args): """Arguments: [dev_num [attr]] List the devices from the controller, optionally filtered by device and attr. """ self._test_channel.send_command('list', args.split()) def do_set_timer_period(self, args): """Arguments: period_ms Set the timer to fire every period_ms milliseconds """ self._test_channel.send_command('set_timer_period', args.split()) def do_start_timer(self, args): """Arguments: None. Start the timer. """ self._test_channel.send_command('start_timer', args.split()) def do_stop_timer(self, args): """Arguments: None. Stop the timer. """ self._test_channel.send_command('stop_timer', args.split()) def do_wait(self, args): """Arguments: time in seconds (float). """ sleep_time = float(args.split()[0]) time.sleep(sleep_time) def do_reset(self, args): """Arguments: None. Resets the simulation. """ self._test_channel.send_command('reset', []) def do_end(self, args): """Arguments: None. Ends the simulation and exits. """ self._test_channel.send_command('END_SIMULATION', []) print('Goodbye.') return True def do_quit(self, args): """Arguments: None. Exits the test channel. """ self._test_channel.send_command('CLOSE_TEST_CHANNEL', []) self._test_channel.close() print('Goodbye.') return True def do_help(self, args): """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr. """ if (len(args) == 0): cmd.Cmd.do_help(self, args) else: self._test_channel.send_command('help', args.split()) def preloop(self): """Clear out the buffer """ response = self._test_channel.receive_response() #def postcmd(self, stop, line): #""" #Called after each command #stop : whether we will stop after this command #line : the previous input line #Return True to stop, False to continue #""" #if stop: #return True #response = self._test_channel.receive_response() #if not response: #return True #print response #return False def main(argv): if len(argv) != 2: print('Usage: python test_channel.py [port]') return try: port = int(argv[1]) except ValueError: print('Error parsing port.') else: try: test_channel = TestChannel(port) except socket.error as e: print('Error connecting to socket: %s' % e) except: print('Error creating test channel (check argument).') else: test_channel_shell = TestChannelShell(test_channel) test_channel_shell.prompt = '$ ' test_channel_shell.cmdloop('Welcome to the RootCanal Console \n' + 'Type \'help\' for more information.') if __name__ == '__main__': main(sys.argv)