• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Copyright 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16"""Script for sending data to a port.
17
18This script provides a simple shell interface for sending data at run-time to a
19port.
20
21Usage:
22    1. Choose a port to use. Use 'adb forward tcp:<port>
23    tcp:<port>' to forward the port to the device.
24    2. In a separate shell, build and push the test vendor library to the device
25    using the script mentioned in option A (i.e. without the --test-channel flag
26    set).
27    3. Once logcat has started, turn Bluetooth on from the device.
28    4. Run this program, in the shell from step 1,  the port, also from step 1,
29    as arguments.
30
31    scapy is the tool we use to build packets in Python.
32
33    >>> d = HCI_Hdr(type=1) / HCI_Command_Hdr(opcode = 0x1004) /
34    Raw(load='\x01')
35    >>> print(d)
36    <HCI_Hdr  type=Command |<HCI_Command_Hdr  opcode=0x1004 |<Raw  load='\x01'
37    |>>>
38    >>> raw(d)
39    '\x01\x04\x10\x01\x01'
40    >>> hexdump(d)
41    0000  0104100101                       .....
42
43
44    >>> pkt = HCI_Hdr('\x02\x02\x20\x0a\x00\x06\x00\x01\x00') /
45    L2CAP_CmdHdr(code=10, id=2, len=2) /L2CAP_InfoReq(type=2)
46    >>> pkt
47    <HCI_Hdr  type=ACL Data |<HCI_ACL_Hdr  handle=2 PB=0 BC=2 len=10 |<L2CAP_Hdr
48    len=6 cid=control |<L2CAP_CmdHdr  code=info_req id=2 len=2 |<L2CAP_InfoReq
49    type=FEAT_MASK |>>>>>
50    >>> pkt = HCI_Hdr(type="ACL Data") / HCI_ACL_Hdr(handle=2, PB=0, BC=2,
51    len=10) / L2CAP_Hdr(len=6, cid="control") / L2CAP_CmdHdr(code="info_req",
52    id=2, len=2) / L2CAP_InfoReq(type="FEAT_MASK")
53    >>> raw(pkt)
54    '\x02\x02 \n\x00\x06\x00\x01\x00\n\x02\x02\x00\x02\x00'
55    >>> hexdump(pkt)
56    0000  0202200A00060001000A0202000200   .. ............
57
58
59"""
60
61#!/usr/bin/env python3
62
63import binascii
64import cmd
65import random
66import socket
67import string
68import struct
69import sys
70from scapy.all import *
71""" Add some more SCAPY stuff"""
72
73
74class HCI_Cmd_Connect(Packet):
75    name = "Connect"
76    fields_desc = [
77        ByteEnumField("filter", 0, {0: "address"}),
78        LEShortField("packet_type", 8),
79        ByteEnumField("page_scan_repetition_mode", 0, {
80            0: "R0",
81            1: "R1",
82            2: "R2"
83        }),
84        ByteEnumField("page_scan_repetition_mode", 0, {0: "Reserved"}),
85        LEShortField("clock_offset", 0),
86        ByteEnumField("allow_role_switch", 0, {
87            0: "false",
88            1: "true"
89        }),
90    ]
91
92
93class HCI_Cmd_Inquiry(Packet):
94    name = "Inquiry"
95    fields_desc = [
96        XByteField("LAP0", 0),
97        XByteField("LAP1", 0x8B),
98        XByteField("LAP2", 0x9E),
99        ByteField("length", 1),
100        ByteField("max_responses", 0),
101    ]
102
103
104""" END SCAPY stuff"""
105
106
107class Connection(object):
108    """Simple wrapper class for a socket object.
109
110  Attributes:
111    socket: The underlying socket created for the specified address and port.
112  """
113
114    def __init__(self, port):
115        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
116        self._socket.connect(("localhost", port))
117
118    def close(self):
119        self._socket.close()
120
121    def send(self, data):
122        self._socket.sendall(data)
123
124    def receive(self, size):
125        return self._socket.recv(size)
126
127
128class RawPort(object):
129    """Converts outgoing packets to binary and sends them.
130
131  Attributes:
132    connection: The connection to the HCI port.
133  """
134
135    def __init__(self, port):
136        self._connection = Connection(port)
137
138    def close(self):
139        self._connection.close()
140
141    def send_binary(self, args):
142        joined_args = "".join(arg for arg in args)
143        print(joined_args)
144        packet = binascii.a2b_hex(joined_args)
145        self._connection.send(packet)
146
147    def receive_response(self):
148        ready_to_read, ready_to_write, in_error = \
149                   select(
150                      [ self._connection._socket ],
151                      [ ],
152                      [ self._connection._socket ],
153                      1.5)
154        if len(in_error) > 0:
155            print("Error")
156            return False
157        if len(ready_to_read) > 0:
158            print("Ready to Read")
159            type_str = self._connection.receive(512)
160            print(len(type_str))
161            print(type_str)
162            return type_str
163        print("Returning false at the end")
164        return False
165
166
167class RawPortShell(cmd.Cmd):
168    """Shell for sending binary data to a port.
169
170  """
171
172    def __init__(self, raw_port):
173        cmd.Cmd.__init__(self)
174        self._raw_port = raw_port
175
176    def do_send(self, args):
177        """Arguments: dev_type_str Add a new device of type dev_type_str.
178
179    """
180        self._raw_port.send_binary(args.split())
181
182    def do_scan(self, args):
183        """Arguments: timeout (seconds) Print the scan responses from reachable devices
184
185    """
186        self._raw_port.send_binary(args.split())
187
188    def do_quit(self, args):
189        """Arguments: None.
190
191    Exits.
192    """
193        self._raw_port.close()
194        print("Goodbye.")
195        return True
196
197    def do_help(self, args):
198        """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.
199
200    """
201        if (len(args) == 0):
202            cmd.Cmd.do_help(self, args)
203
204    def postcmd(self, stop, line):
205        """Called after each command stop : whether we will stop after this command line : the previous input line Return True to stop, False to continue
206
207    """
208        if stop:
209            return True
210        response = self._raw_port.receive_response()
211        print(response)
212        return False
213
214
215def main(argv):
216    if len(argv) != 2:
217        print("Usage: python raw_port.py [port]")
218        return
219    try:
220        port = int(argv[1])
221    except ValueError:
222        print("Error parsing port.")
223    else:
224        try:
225            raw_port = RawPort(port)
226        except (socket.error, e):
227            print("Error connecting to socket: %s" % e)
228        except:
229            print("Error creating (check arguments).")
230        else:
231            raw_port_shell = RawPortShell(raw_port)
232            raw_port_shell.prompt = "$ "
233            raw_port_shell.cmdloop("Welcome to the RootCanal Console \n" + 'Type \'help\' for more information.')
234
235
236if __name__ == "__main__":
237    main(sys.argv)
238