• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Copyright 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16"""Script for sending data to a port.
17
18This script provides a simple shell interface for sending data at run-time to a
19port.
20
21Usage:
22    1. Choose a port to use. Use 'adb forward tcp:<port>
23    tcp:<port>' to forward the port to the device.
24    2. In a separate shell, build and push the test vendor library to the device
25    using the script mentioned in option A (i.e. without the --test-channel flag
26    set).
27    3. Once logcat has started, turn Bluetooth on from the device.
28    4. Run this program, in the shell from step 1,  the port, also from step 1,
29    as arguments.
30
31    scapy is the tool we use to build packets in Python.
32
33    >>> d = HCI_Hdr(type=1) / HCI_Command_Hdr(opcode = 0x1004) /
34    Raw(load='\x01')
35    >>> print(d)
36    <HCI_Hdr  type=Command |<HCI_Command_Hdr  opcode=0x1004 |<Raw  load='\x01'
37    |>>>
38    >>> raw(d)
39    '\x01\x04\x10\x01\x01'
40    >>> hexdump(d)
41    0000  0104100101                       .....
42
43
44    >>> pkt = HCI_Hdr('\x02\x02\x20\x0a\x00\x06\x00\x01\x00') /
45    L2CAP_CmdHdr(code=10, id=2, len=2) /L2CAP_InfoReq(type=2)
46    >>> pkt
47    <HCI_Hdr  type=ACL Data |<HCI_ACL_Hdr  handle=2 PB=0 BC=2 len=10 |<L2CAP_Hdr
48    len=6 cid=control |<L2CAP_CmdHdr  code=info_req id=2 len=2 |<L2CAP_InfoReq
49    type=FEAT_MASK |>>>>>
50    >>> pkt = HCI_Hdr(type='ACL Data') / HCI_ACL_Hdr(handle=2, PB=0, BC=2,
51    len=10) / L2CAP_Hdr(len=6, cid='control') / L2CAP_CmdHdr(code='info_req',
52    id=2, len=2) / L2CAP_InfoReq(type='FEAT_MASK')
53    >>> raw(pkt)
54    '\x02\x02 \n\x00\x06\x00\x01\x00\n\x02\x02\x00\x02\x00'
55    >>> hexdump(pkt)
56    0000  0202200A00060001000A0202000200   .. ............
57
58
59"""
60
61#!/usr/bin/env python3
62
63import binascii
64import cmd
65import queue
66import random
67import socket
68import string
69import struct
70import sys
71
72
73class LinkLayerSocket(object):
74    """Simple wrapper class for a socket object.
75
76  Attributes:
77    socket: The underlying socket created for the specified address and port.
78  """
79
80    def __init__(self, port):
81        print('port = ' + port)
82        self.done_ = False
83        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
84        self._socket.connect(('localhost', port))
85        # Should it be a non-blocking socket?
86        # self._socket.setblocking(0)
87        self.packets_ = queue.Queue()
88        self.rx_thread_ = threading.Thread(target=self.rx_thread_body)
89        self.rx_thread_.start()
90
91    def rx_bytes(self, size):
92        while not self.done_:
93            raw_bytes = b''
94            while len(raw_bytes) < size and not self.done_:
95                more_raw_bytes = self._socket.recv(min(size - len(raw_bytes), 2048))
96                if more_raw_bytes:
97                    raw_bytes += more_raw_bytes
98            return raw_bytes
99
100    def rx_thread_body(self):
101        while not self.done_:
102            payload_length = 0
103            # Read the size (4B), the type (1B), and the addresses (2*6B)
104            header = self.rx_bytes(17)
105            if not header:
106                continue
107            payload_length = header[0]
108            payload_length |= header[1] << 8
109            payload_length |= header[2] << 16
110            payload_length |= header[3] << 24
111            print('Rx: type_byte ' + hex(header[4]))
112            print('Rx: from ' + hex(header[5]) + ':' + hex(header[6]) + ':' + hex(header[7]) + ':' + hex(header[8]) +
113                  ':' + hex(header[9]) + ':' + hex(header[10]))
114            print('Rx: to ' + hex(header[11]) + ':' + hex(header[12]) + ':' + hex(header[13]) + ':' + hex(header[14]) +
115                  ':' + hex(header[15]) + ':' + hex(header[16]))
116            # Read the Payload
117            payload = self.rx_bytes(payload_length) if payload_length != 0 else b''
118            packet_bytes = header + payload
119            self.packets_.put(packet_bytes)
120
121    def get_packet(self):
122        if self.packets_.empty():
123            return False
124        return self.packets_.get()
125
126    def send_binary(self, args):
127        joined_args = ''.join(arg for arg in args)
128        print(joined_args)
129        packet = binascii.a2b_hex(joined_args)
130        if self._done:
131            return
132        self._connection.send(packet)
133
134    def tell_rx_thread_to_quit(self):
135        self.done_ = True
136        self.rx_thread_.join()
137
138
139class LinkLayerShell(cmd.Cmd):
140    """Shell for sending binary data to a port.
141
142  """
143
144    def __init__(self, link_layer):
145        cmd.Cmd.__init__(self)
146        self._link_layer = link_layer
147
148    def do_send(self, args):
149        """Arguments: binary representation of a packet.
150
151    """
152        self._link_layer.send_binary(args.split())
153
154    def do_quit(self, args):
155        """Arguments: None.
156
157    Exits.
158    """
159        self._link_layer.tell_rx_thread_to_quit()
160        self._link_layer.close()
161        print('Goodbye.')
162        return True
163
164    def do_help(self, args):
165        """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.
166
167    """
168        if (len(args) == 0):
169            cmd.Cmd.do_help(self, args)
170
171
172def main(argv):
173    if len(argv) != 2:
174        print('Usage: python link_layer_socket.py [port]')
175        return
176    try:
177        port = int(argv[1])
178    except ValueError:
179        print('Error parsing port.')
180    else:
181        try:
182            link_layer = LinkLayerSocket(port)
183        except socket.error as e:
184            print('Error connecting to socket: %s' % e)
185        except:
186            print('Error creating (check arguments).')
187        else:
188            link_layer_shell = LinkLayerShell(link_layer)
189            link_layer_shell.prompt = '$ '
190            link_layer_shell.cmdloop('Welcome to the RootCanal LinkLayer Console \n' +
191                                     'Type \'help\' for more information.')
192
193
194if __name__ == '__main__':
195    main(sys.argv)
196