1# 2# Copyright 2015 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16"""Script for sending data to a port. 17 18This script provides a simple shell interface for sending data at run-time to a 19port. 20 21Usage: 22 1. Choose a port to use. Use 'adb forward tcp:<port> 23 tcp:<port>' to forward the port to the device. 24 2. In a separate shell, build and push the test vendor library to the device 25 using the script mentioned in option A (i.e. without the --test-channel flag 26 set). 27 3. Once logcat has started, turn Bluetooth on from the device. 28 4. Run this program, in the shell from step 1, the port, also from step 1, 29 as arguments. 30 31 scapy is the tool we use to build packets in Python. 32 33 >>> d = HCI_Hdr(type=1) / HCI_Command_Hdr(opcode = 0x1004) / 34 Raw(load='\x01') 35 >>> print(d) 36 <HCI_Hdr type=Command |<HCI_Command_Hdr opcode=0x1004 |<Raw load='\x01' 37 |>>> 38 >>> raw(d) 39 '\x01\x04\x10\x01\x01' 40 >>> hexdump(d) 41 0000 0104100101 ..... 42 43 44 >>> pkt = HCI_Hdr('\x02\x02\x20\x0a\x00\x06\x00\x01\x00') / 45 L2CAP_CmdHdr(code=10, id=2, len=2) /L2CAP_InfoReq(type=2) 46 >>> pkt 47 <HCI_Hdr type=ACL Data |<HCI_ACL_Hdr handle=2 PB=0 BC=2 len=10 |<L2CAP_Hdr 48 len=6 cid=control |<L2CAP_CmdHdr code=info_req id=2 len=2 |<L2CAP_InfoReq 49 type=FEAT_MASK |>>>>> 50 >>> pkt = HCI_Hdr(type="ACL Data") / HCI_ACL_Hdr(handle=2, PB=0, BC=2, 51 len=10) / L2CAP_Hdr(len=6, cid="control") / L2CAP_CmdHdr(code="info_req", 52 id=2, len=2) / L2CAP_InfoReq(type="FEAT_MASK") 53 >>> raw(pkt) 54 '\x02\x02 \n\x00\x06\x00\x01\x00\n\x02\x02\x00\x02\x00' 55 >>> hexdump(pkt) 56 0000 0202200A00060001000A0202000200 .. ............ 57 58 59""" 60 61#!/usr/bin/env python3 62 63import binascii 64import cmd 65import random 66import socket 67import string 68import struct 69import sys 70from scapy.all import * 71""" Add some more SCAPY stuff""" 72 73 74class HCI_Cmd_Connect(Packet): 75 name = "Connect" 76 fields_desc = [ 77 ByteEnumField("filter", 0, {0: "address"}), 78 LEShortField("packet_type", 8), 79 ByteEnumField("page_scan_repetition_mode", 0, { 80 0: "R0", 81 1: "R1", 82 2: "R2" 83 }), 84 ByteEnumField("page_scan_repetition_mode", 0, {0: "Reserved"}), 85 LEShortField("clock_offset", 0), 86 ByteEnumField("allow_role_switch", 0, { 87 0: "false", 88 1: "true" 89 }), 90 ] 91 92 93class HCI_Cmd_Inquiry(Packet): 94 name = "Inquiry" 95 fields_desc = [ 96 XByteField("LAP0", 0), 97 XByteField("LAP1", 0x8B), 98 XByteField("LAP2", 0x9E), 99 ByteField("length", 1), 100 ByteField("max_responses", 0), 101 ] 102 103 104""" END SCAPY stuff""" 105 106 107class Connection(object): 108 """Simple wrapper class for a socket object. 109 110 Attributes: 111 socket: The underlying socket created for the specified address and port. 112 """ 113 114 def __init__(self, port): 115 self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 116 self._socket.connect(("localhost", port)) 117 118 def close(self): 119 self._socket.close() 120 121 def send(self, data): 122 self._socket.sendall(data) 123 124 def receive(self, size): 125 return self._socket.recv(size) 126 127 128class RawPort(object): 129 """Converts outgoing packets to binary and sends them. 130 131 Attributes: 132 connection: The connection to the HCI port. 133 """ 134 135 def __init__(self, port): 136 self._connection = Connection(port) 137 138 def close(self): 139 self._connection.close() 140 141 def send_binary(self, args): 142 joined_args = "".join(arg for arg in args) 143 print(joined_args) 144 packet = binascii.a2b_hex(joined_args) 145 self._connection.send(packet) 146 147 def receive_response(self): 148 ready_to_read, ready_to_write, in_error = \ 149 select( 150 [ self._connection._socket ], 151 [ ], 152 [ self._connection._socket ], 153 1.5) 154 if len(in_error) > 0: 155 print("Error") 156 return False 157 if len(ready_to_read) > 0: 158 print("Ready to Read") 159 type_str = self._connection.receive(512) 160 print(len(type_str)) 161 print(type_str) 162 return type_str 163 print("Returning false at the end") 164 return False 165 166 167class RawPortShell(cmd.Cmd): 168 """Shell for sending binary data to a port. 169 170 """ 171 172 def __init__(self, raw_port): 173 cmd.Cmd.__init__(self) 174 self._raw_port = raw_port 175 176 def do_send(self, args): 177 """Arguments: dev_type_str Add a new device of type dev_type_str. 178 179 """ 180 self._raw_port.send_binary(args.split()) 181 182 def do_scan(self, args): 183 """Arguments: timeout (seconds) Print the scan responses from reachable devices 184 185 """ 186 self._raw_port.send_binary(args.split()) 187 188 def do_quit(self, args): 189 """Arguments: None. 190 191 Exits. 192 """ 193 self._raw_port.close() 194 print("Goodbye.") 195 return True 196 197 def do_help(self, args): 198 """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr. 199 200 """ 201 if (len(args) == 0): 202 cmd.Cmd.do_help(self, args) 203 204 def postcmd(self, stop, line): 205 """Called after each command stop : whether we will stop after this command line : the previous input line Return True to stop, False to continue 206 207 """ 208 if stop: 209 return True 210 response = self._raw_port.receive_response() 211 print(response) 212 return False 213 214 215def main(argv): 216 if len(argv) != 2: 217 print("Usage: python raw_port.py [port]") 218 return 219 try: 220 port = int(argv[1]) 221 except ValueError: 222 print("Error parsing port.") 223 else: 224 try: 225 raw_port = RawPort(port) 226 except (socket.error, e): 227 print("Error connecting to socket: %s" % e) 228 except: 229 print("Error creating (check arguments).") 230 else: 231 raw_port_shell = RawPortShell(raw_port) 232 raw_port_shell.prompt = "$ " 233 raw_port_shell.cmdloop("Welcome to the RootCanal Console \n" + 'Type \'help\' for more information.') 234 235 236if __name__ == "__main__": 237 main(sys.argv) 238