1# 2# Copyright 2018 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16"""Script for sending data to a port. 17 18This script provides a simple shell interface for sending data at run-time to a 19port. 20 21Usage: 22 1. Choose a port to use. Use 'adb forward tcp:<port> 23 tcp:<port>' to forward the port to the device. 24 2. In a separate shell, build and push the test vendor library to the device 25 using the script mentioned in option A (i.e. without the --test-channel flag 26 set). 27 3. Once logcat has started, turn Bluetooth on from the device. 28 4. Run this program, in the shell from step 1, the port, also from step 1, 29 as arguments. 30 31 32 length type source dest addr_type adv_type EIR Data 33 34 adv public connectable Name (TATA) 35send 18000000 07 010203040506 000000000000 00 00 050954415441 36 020106 37 38 length adv source dest public non-conn Name (TETE) 39send 18000000 07 010203040507 000000000000 00 03 050954455445 40 020106 41 42 length scan_rsp source dest public scan_rsp Name (TFTF) 43send 18000000 09 010203040506 4de24c67454b 00 04 050954465446 44 020106 45 46""" 47 48#!/usr/bin/env python3 49 50import binascii 51import cmd 52import random 53import socket 54import string 55import struct 56import sys 57 58DEVICE_NAME_LENGTH = 6 59DEVICE_ADDRESS_LENGTH = 6 60 61 62# Used to generate fake device names and addresses during discovery. 63def generate_random_name(): 64 return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \ 65 string.digits) for _ in range(DEVICE_NAME_LENGTH)) 66 67 68def generate_random_address(): 69 return ''.join(random.SystemRandom().choice(string.digits) for _ in \ 70 range(DEVICE_ADDRESS_LENGTH)) 71 72 73class Connection(object): 74 """Simple wrapper class for a socket object. 75 76 Attributes: 77 socket: The underlying socket created for the specified address and port. 78 """ 79 80 def __init__(self, port): 81 self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 82 self._socket.connect(('localhost', port)) 83 self._socket.setblocking(0) 84 85 def close(self): 86 self._socket.close() 87 88 def send(self, data): 89 self._socket.sendall(data) 90 91 def receive(self, size): 92 return self._socket.recv(size) 93 94 95class RawPort(object): 96 """Checks outgoing commands and sends them once verified. 97 98 Attributes: 99 connection: The connection to the HCI port. 100 """ 101 102 def __init__(self, port): 103 self._connection = Connection(port) 104 self._closed = False 105 106 def close(self): 107 self._connection.close() 108 self._closed = True 109 110 def send_binary(self, args): 111 joined_args = ''.join(arg for arg in args) 112 print(joined_args) 113 packet = binascii.a2b_hex(joined_args) 114 if self._closed: 115 return 116 self._connection.send(packet) 117 118 def receive_response(self): 119 if self._closed: 120 return 121 size_chars = self._connection.receive(4) 122 if not size_chars: 123 print('Debug: No response') 124 return False 125 size_bytes = bytearray(size_chars) 126 response_size = 0 127 for i in range(0, len(size_chars) - 1): 128 response_size |= ord(size_chars[i]) << (8 * i) 129 response = self._connection.receive(response_size) 130 return response 131 132 def lint_command(self, name, args, name_size, args_size): 133 assert name_size == len(name) and args_size == len(args) 134 try: 135 name.encode('utf-8') 136 for arg in args: 137 arg.encode('utf-8') 138 except UnicodeError: 139 print('Unrecognized characters.') 140 raise 141 if name_size > 255 or args_size > 255: 142 raise ValueError # Size must be encodable in one octet. 143 for arg in args: 144 if len(arg) > 255: 145 raise ValueError # Size must be encodable in one octet. 146 147 148class RawPortShell(cmd.Cmd): 149 """Shell for sending binary data to a port.""" 150 151 def __init__(self, raw_port): 152 cmd.Cmd.__init__(self) 153 self._raw_port = raw_port 154 155 def do_send(self, args): 156 """Arguments: dev_type_str Add a new device of type dev_type_str.""" 157 self._raw_port.send_binary(args.split()) 158 159 def do_quit(self, args): 160 """Arguments: None. 161 162 Exits. 163 """ 164 self._raw_port.close() 165 print('Goodbye.') 166 return True 167 168 def do_help(self, args): 169 """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.""" 170 if (len(args) == 0): 171 cmd.Cmd.do_help(self, args) 172 173 174def main(argv): 175 if len(argv) != 2: 176 print('Usage: python raw_port.py [port]') 177 return 178 try: 179 port = int(argv[1]) 180 except ValueError: 181 print('Error parsing port.') 182 else: 183 try: 184 raw_port = RawPort(port) 185 except (socket.error, e): 186 print('Error connecting to socket: %s' % e) 187 except: 188 print('Error creating (check arguments).') 189 else: 190 raw_port_shell = RawPortShell(raw_port) 191 raw_port_shell.prompt = '$ ' 192 raw_port_shell.cmdloop('Welcome to the RootCanal Console \n' + 'Type \'help\' for more information.') 193 194 195if __name__ == '__main__': 196 main(sys.argv) 197