• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Copyright 2018 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16"""Script for sending data to a port.
17
18This script provides a simple shell interface for sending data at run-time to a
19port.
20
21Usage:
22    1. Choose a port to use. Use 'adb forward tcp:<port>
23    tcp:<port>' to forward the port to the device.
24    2. In a separate shell, build and push the test vendor library to the device
25    using the script mentioned in option A (i.e. without the --test-channel flag
26    set).
27    3. Once logcat has started, turn Bluetooth on from the device.
28    4. Run this program, in the shell from step 1,  the port, also from step 1,
29    as arguments.
30
31
32     length   type   source    dest      addr_type  adv_type     EIR Data
33
34              adv                          public  connectable  Name (TATA)
35send 18000000 07 010203040506 000000000000   00      00         050954415441
36                                                                020106
37
38     length   adv   source    dest        public   non-conn     Name (TETE)
39send 18000000 07 010203040507 000000000000   00       03        050954455445
40                                                                020106
41
42     length scan_rsp  source    dest        public  scan_rsp    Name (TFTF)
43send 18000000 09 010203040506 4de24c67454b    00      04        050954465446
44                                                                020106
45
46"""
47
48#!/usr/bin/env python3
49
50import binascii
51import cmd
52import random
53import socket
54import string
55import struct
56import sys
57
58DEVICE_NAME_LENGTH = 6
59DEVICE_ADDRESS_LENGTH = 6
60
61
62# Used to generate fake device names and addresses during discovery.
63def generate_random_name():
64    return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \
65      string.digits) for _ in range(DEVICE_NAME_LENGTH))
66
67
68def generate_random_address():
69    return ''.join(random.SystemRandom().choice(string.digits) for _ in \
70      range(DEVICE_ADDRESS_LENGTH))
71
72
73class Connection(object):
74    """Simple wrapper class for a socket object.
75
76  Attributes:
77    socket: The underlying socket created for the specified address and port.
78  """
79
80    def __init__(self, port):
81        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
82        self._socket.connect(('localhost', port))
83        self._socket.setblocking(0)
84
85    def close(self):
86        self._socket.close()
87
88    def send(self, data):
89        self._socket.sendall(data)
90
91    def receive(self, size):
92        return self._socket.recv(size)
93
94
95class RawPort(object):
96    """Checks outgoing commands and sends them once verified.
97
98  Attributes:
99    connection: The connection to the HCI port.
100  """
101
102    def __init__(self, port):
103        self._connection = Connection(port)
104        self._closed = False
105
106    def close(self):
107        self._connection.close()
108        self._closed = True
109
110    def send_binary(self, args):
111        joined_args = ''.join(arg for arg in args)
112        print(joined_args)
113        packet = binascii.a2b_hex(joined_args)
114        if self._closed:
115            return
116        self._connection.send(packet)
117
118    def receive_response(self):
119        if self._closed:
120            return
121        size_chars = self._connection.receive(4)
122        if not size_chars:
123            print('Debug: No response')
124            return False
125        size_bytes = bytearray(size_chars)
126        response_size = 0
127        for i in range(0, len(size_chars) - 1):
128            response_size |= ord(size_chars[i]) << (8 * i)
129        response = self._connection.receive(response_size)
130        return response
131
132    def lint_command(self, name, args, name_size, args_size):
133        assert name_size == len(name) and args_size == len(args)
134        try:
135            name.encode('utf-8')
136            for arg in args:
137                arg.encode('utf-8')
138        except UnicodeError:
139            print('Unrecognized characters.')
140            raise
141        if name_size > 255 or args_size > 255:
142            raise ValueError  # Size must be encodable in one octet.
143        for arg in args:
144            if len(arg) > 255:
145                raise ValueError  # Size must be encodable in one octet.
146
147
148class RawPortShell(cmd.Cmd):
149    """Shell for sending binary data to a port."""
150
151    def __init__(self, raw_port):
152        cmd.Cmd.__init__(self)
153        self._raw_port = raw_port
154
155    def do_send(self, args):
156        """Arguments: dev_type_str Add a new device of type dev_type_str."""
157        self._raw_port.send_binary(args.split())
158
159    def do_quit(self, args):
160        """Arguments: None.
161
162    Exits.
163    """
164        self._raw_port.close()
165        print('Goodbye.')
166        return True
167
168    def do_help(self, args):
169        """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr."""
170        if (len(args) == 0):
171            cmd.Cmd.do_help(self, args)
172
173
174def main(argv):
175    if len(argv) != 2:
176        print('Usage: python raw_port.py [port]')
177        return
178    try:
179        port = int(argv[1])
180    except ValueError:
181        print('Error parsing port.')
182    else:
183        try:
184            raw_port = RawPort(port)
185        except (socket.error, e):
186            print('Error connecting to socket: %s' % e)
187        except:
188            print('Error creating (check arguments).')
189        else:
190            raw_port_shell = RawPortShell(raw_port)
191            raw_port_shell.prompt = '$ '
192            raw_port_shell.cmdloop('Welcome to the RootCanal Console \n' + 'Type \'help\' for more information.')
193
194
195if __name__ == '__main__':
196    main(sys.argv)
197