1# 2# Copyright 2015 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16"""Script for sending data to a port. 17 18This script provides a simple shell interface for sending data at run-time to a 19port. 20 21Usage: 22 1. Choose a port to use. Use 'adb forward tcp:<port> 23 tcp:<port>' to forward the port to the device. 24 2. In a separate shell, build and push the test vendor library to the device 25 using the script mentioned in option A (i.e. without the --test-channel flag 26 set). 27 3. Once logcat has started, turn Bluetooth on from the device. 28 4. Run this program, in the shell from step 1, the port, also from step 1, 29 as arguments. 30 31 scapy is the tool we use to build packets in Python. 32 33 >>> d = HCI_Hdr(type=1) / HCI_Command_Hdr(opcode = 0x1004) / 34 Raw(load='\x01') 35 >>> print(d) 36 <HCI_Hdr type=Command |<HCI_Command_Hdr opcode=0x1004 |<Raw load='\x01' 37 |>>> 38 >>> raw(d) 39 '\x01\x04\x10\x01\x01' 40 >>> hexdump(d) 41 0000 0104100101 ..... 42 43 44 >>> pkt = HCI_Hdr('\x02\x02\x20\x0a\x00\x06\x00\x01\x00') / 45 L2CAP_CmdHdr(code=10, id=2, len=2) /L2CAP_InfoReq(type=2) 46 >>> pkt 47 <HCI_Hdr type=ACL Data |<HCI_ACL_Hdr handle=2 PB=0 BC=2 len=10 |<L2CAP_Hdr 48 len=6 cid=control |<L2CAP_CmdHdr code=info_req id=2 len=2 |<L2CAP_InfoReq 49 type=FEAT_MASK |>>>>> 50 >>> pkt = HCI_Hdr(type='ACL Data') / HCI_ACL_Hdr(handle=2, PB=0, BC=2, 51 len=10) / L2CAP_Hdr(len=6, cid='control') / L2CAP_CmdHdr(code='info_req', 52 id=2, len=2) / L2CAP_InfoReq(type='FEAT_MASK') 53 >>> raw(pkt) 54 '\x02\x02 \n\x00\x06\x00\x01\x00\n\x02\x02\x00\x02\x00' 55 >>> hexdump(pkt) 56 0000 0202200A00060001000A0202000200 .. ............ 57 58 59""" 60 61#!/usr/bin/env python3 62 63import binascii 64import cmd 65import queue 66import random 67import socket 68import string 69import struct 70import sys 71 72 73class LinkLayerSocket(object): 74 """Simple wrapper class for a socket object. 75 76 Attributes: 77 socket: The underlying socket created for the specified address and port. 78 """ 79 80 def __init__(self, port): 81 print('port = ' + port) 82 self.done_ = False 83 self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 84 self._socket.connect(('localhost', port)) 85 # Should it be a non-blocking socket? 86 # self._socket.setblocking(0) 87 self.packets_ = queue.Queue() 88 self.rx_thread_ = threading.Thread(target=self.rx_thread_body) 89 self.rx_thread_.start() 90 91 def rx_bytes(self, size): 92 while not self.done_: 93 raw_bytes = b'' 94 while len(raw_bytes) < size and not self.done_: 95 more_raw_bytes = self._socket.recv(min(size - len(raw_bytes), 2048)) 96 if more_raw_bytes: 97 raw_bytes += more_raw_bytes 98 return raw_bytes 99 100 def rx_thread_body(self): 101 while not self.done_: 102 payload_length = 0 103 # Read the size (4B), the type (1B), and the addresses (2*6B) 104 header = self.rx_bytes(17) 105 if not header: 106 continue 107 payload_length = header[0] 108 payload_length |= header[1] << 8 109 payload_length |= header[2] << 16 110 payload_length |= header[3] << 24 111 print('Rx: type_byte ' + hex(header[4])) 112 print('Rx: from ' + hex(header[5]) + ':' + hex(header[6]) + ':' + hex(header[7]) + ':' + hex(header[8]) + 113 ':' + hex(header[9]) + ':' + hex(header[10])) 114 print('Rx: to ' + hex(header[11]) + ':' + hex(header[12]) + ':' + hex(header[13]) + ':' + hex(header[14]) + 115 ':' + hex(header[15]) + ':' + hex(header[16])) 116 # Read the Payload 117 payload = self.rx_bytes(payload_length) if payload_length != 0 else b'' 118 packet_bytes = header + payload 119 self.packets_.put(packet_bytes) 120 121 def get_packet(self): 122 if self.packets_.empty(): 123 return False 124 return self.packets_.get() 125 126 def send_binary(self, args): 127 joined_args = ''.join(arg for arg in args) 128 print(joined_args) 129 packet = binascii.a2b_hex(joined_args) 130 if self._done: 131 return 132 self._connection.send(packet) 133 134 def tell_rx_thread_to_quit(self): 135 self.done_ = True 136 self.rx_thread_.join() 137 138 139class LinkLayerShell(cmd.Cmd): 140 """Shell for sending binary data to a port. 141 142 """ 143 144 def __init__(self, link_layer): 145 cmd.Cmd.__init__(self) 146 self._link_layer = link_layer 147 148 def do_send(self, args): 149 """Arguments: binary representation of a packet. 150 151 """ 152 self._link_layer.send_binary(args.split()) 153 154 def do_quit(self, args): 155 """Arguments: None. 156 157 Exits. 158 """ 159 self._link_layer.tell_rx_thread_to_quit() 160 self._link_layer.close() 161 print('Goodbye.') 162 return True 163 164 def do_help(self, args): 165 """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr. 166 167 """ 168 if (len(args) == 0): 169 cmd.Cmd.do_help(self, args) 170 171 172def main(argv): 173 if len(argv) != 2: 174 print('Usage: python link_layer_socket.py [port]') 175 return 176 try: 177 port = int(argv[1]) 178 except ValueError: 179 print('Error parsing port.') 180 else: 181 try: 182 link_layer = LinkLayerSocket(port) 183 except socket.error as e: 184 print('Error connecting to socket: %s' % e) 185 except: 186 print('Error creating (check arguments).') 187 else: 188 link_layer_shell = LinkLayerShell(link_layer) 189 link_layer_shell.prompt = '$ ' 190 link_layer_shell.cmdloop('Welcome to the RootCanal LinkLayer Console \n' + 191 'Type \'help\' for more information.') 192 193 194if __name__ == '__main__': 195 main(sys.argv) 196