• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Copyright 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16"""Script for sending testing parameters and commands to a Bluetooth device.
17
18This script provides a simple shell interface for sending data at run-time to a
19Bluetooth device. It is intended to be used in tandem with the test vendor
20library project.
21
22Usage:
23  Option A: Script
24    1. Run build_and_run.sh in scripts/ with the --test-channel flag set and the
25    port to use for the test channel.
26  Option B: Manual
27    1. Choose a port to use for the test channel. Use 'adb forward tcp:<port>
28    tcp:<port>' to forward the port to the device.
29    2. In a separate shell, build and push the test vendor library to the device
30    using the script mentioned in option A (i.e. without the --test-channel flag
31    set).
32    3. Once logcat has started, turn Bluetooth on from the device.
33    4. Run this program, in the shell from step 1,  the port, also from step 1,
34    as arguments.
35"""
36
37#!/usr/bin/env python3
38
39import cmd
40import random
41import socket
42import string
43import struct
44import sys
45import time
46
47DEVICE_NAME_LENGTH = 6
48DEVICE_ADDRESS_LENGTH = 6
49
50
51# Used to generate fake device names and addresses during discovery.
52def generate_random_name():
53    return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \
54      string.digits) for _ in range(DEVICE_NAME_LENGTH))
55
56
57def generate_random_address():
58    return ''.join(random.SystemRandom().choice(string.digits) for _ in \
59      range(DEVICE_ADDRESS_LENGTH))
60
61
62class Connection(object):
63    """Simple wrapper class for a socket object.
64
65  Attributes:
66    socket: The underlying socket created for the specified address and port.
67  """
68
69    def __init__(self, port):
70        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
71        self._socket.connect(('localhost', port))
72
73    def close(self):
74        self._socket.close()
75
76    def send(self, data):
77        self._socket.sendall(data.encode())
78
79    def receive(self, size):
80        return self._socket.recv(size)
81
82
83class TestChannel(object):
84    """Checks outgoing commands and sends them once verified.
85
86  Attributes:
87    connection: The connection to the test vendor library that commands are sent
88      on.
89  """
90
91    def __init__(self, port):
92        self._connection = Connection(port)
93        self._closed = False
94
95    def close(self):
96        self._connection.close()
97        self._closed = True
98
99    def send_command(self, name, args):
100        name_size = len(name)
101        args_size = len(args)
102        self.lint_command(name, args, name_size, args_size)
103        encoded_name = chr(name_size) + name
104        encoded_args = chr(args_size) + ''.join(chr(len(arg)) + arg for arg in args)
105        command = encoded_name + encoded_args
106        if self._closed:
107            return
108        self._connection.send(command)
109        if name != 'CLOSE_TEST_CHANNEL':
110            print(self.receive_response().decode())
111
112    def receive_response(self):
113        if self._closed:
114            return b'Closed'
115        size_chars = self._connection.receive(4)
116        size_bytes = bytearray(size_chars)
117        if not size_chars:
118            return b'No response, assuming that the connection is broken'
119        response_size = 0
120        for i in range(0, len(size_chars) - 1):
121            response_size |= (size_chars[i] << (8 * i))
122        response = self._connection.receive(response_size)
123        return response
124
125    def lint_command(self, name, args, name_size, args_size):
126        assert name_size == len(name) and args_size == len(args)
127        try:
128            name.encode()
129            for arg in args:
130                arg.encode()
131        except UnicodeError:
132            print('Unrecognized characters.')
133            raise
134        if name_size > 255 or args_size > 255:
135            raise ValueError  # Size must be encodable in one octet.
136        for arg in args:
137            if len(arg) > 255:
138                raise ValueError  # Size must be encodable in one octet.
139
140
141class TestChannelShell(cmd.Cmd):
142    """Shell for sending test channel data to controller.
143
144  Manages the test channel to the controller and defines a set of commands the
145  user can send to the controller as well. These commands are processed parallel
146  to commands sent from the device stack and used to provide additional
147  debugging/testing capabilities.
148
149  Attributes:
150    test_channel: The communication channel to send data to the controller.
151  """
152
153    def __init__(self, test_channel):
154        cmd.Cmd.__init__(self)
155        self._test_channel = test_channel
156
157    def do_add(self, args):
158        """Arguments: dev_type_str Add a new device of type dev_type_str.
159
160    """
161        self._test_channel.send_command('add', args.split())
162
163    def do_del(self, args):
164        """Arguments: device index Delete the device with the specified index.
165
166    """
167        self._test_channel.send_command('del', args.split())
168
169    def do_add_phy(self, args):
170        """Arguments: dev_type_str Add a new device of type dev_type_str.
171
172    """
173        self._test_channel.send_command('add_phy', args.split())
174
175    def do_del_phy(self, args):
176        """Arguments: phy index Delete the phy with the specified index.
177
178    """
179        self._test_channel.send_command('del_phy', args.split())
180
181    def do_add_device_to_phy(self, args):
182        """Arguments: device index phy index Add a new device of type dev_type_str.
183
184    """
185        self._test_channel.send_command('add_device_to_phy', args.split())
186
187    def do_del_device_from_phy(self, args):
188        """Arguments: phy index Delete the phy with the specified index.
189
190    """
191        self._test_channel.send_command('del_device_from_phy', args.split())
192
193    def do_add_remote(self, args):
194        """Arguments: dev_type_str Connect to a remote device at arg1@arg2.
195
196    """
197        self._test_channel.send_command('add_remote', args.split())
198
199    def do_get(self, args):
200        """Arguments: dev_num attr_str Get the value of the attribute attr_str from device dev_num.
201
202    """
203        self._test_channel.send_command('get', args.split())
204
205    def do_set(self, args):
206        """Arguments: dev_num attr_str val Set the value of the attribute attr_str from device dev_num equal to val.
207
208    """
209        self._test_channel.send_command('set', args.split())
210
211    def do_set_device_address(self, args):
212        """Arguments: dev_num addr Set the address of device dev_num equal to addr.
213
214    """
215        self._test_channel.send_command('set_device_address', args.split())
216
217    def do_list(self, args):
218        """Arguments: [dev_num [attr]] List the devices from the controller, optionally filtered by device and attr.
219
220    """
221        self._test_channel.send_command('list', args.split())
222
223    def do_set_timer_period(self, args):
224        """Arguments: period_ms Set the timer to fire every period_ms milliseconds
225    """
226        self._test_channel.send_command('set_timer_period', args.split())
227
228    def do_start_timer(self, args):
229        """Arguments: None. Start the timer.
230    """
231        self._test_channel.send_command('start_timer', args.split())
232
233    def do_stop_timer(self, args):
234        """Arguments: None. Stop the timer.
235    """
236        self._test_channel.send_command('stop_timer', args.split())
237
238    def do_wait(self, args):
239        """Arguments: time in seconds (float).
240    """
241        sleep_time = float(args.split()[0])
242        time.sleep(sleep_time)
243
244    def do_reset(self, args):
245        """Arguments: None.
246
247    Resets the simulation.
248    """
249        self._test_channel.send_command('reset', [])
250
251    def do_end(self, args):
252        """Arguments: None.
253
254    Ends the simulation and exits.
255    """
256        self._test_channel.send_command('END_SIMULATION', [])
257        print('Goodbye.')
258        return True
259
260    def do_quit(self, args):
261        """Arguments: None.
262
263    Exits the test channel.
264    """
265        self._test_channel.send_command('CLOSE_TEST_CHANNEL', [])
266        self._test_channel.close()
267        print('Goodbye.')
268        return True
269
270    def do_help(self, args):
271        """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.
272
273    """
274        if (len(args) == 0):
275            cmd.Cmd.do_help(self, args)
276        else:
277            self._test_channel.send_command('help', args.split())
278
279    def preloop(self):
280        """Clear out the buffer
281
282    """
283        response = self._test_channel.receive_response()
284
285    #def postcmd(self, stop, line):
286    #"""
287    #Called after each command
288    #stop : whether we will stop after this command
289    #line : the previous input line
290    #Return True to stop, False to continue
291    #"""
292    #if stop:
293    #return True
294    #response = self._test_channel.receive_response()
295    #if not response:
296    #return True
297    #print response
298    #return False
299
300
301def main(argv):
302    if len(argv) != 2:
303        print('Usage: python test_channel.py [port]')
304        return
305    try:
306        port = int(argv[1])
307    except ValueError:
308        print('Error parsing port.')
309    else:
310        try:
311            test_channel = TestChannel(port)
312        except socket.error as e:
313            print('Error connecting to socket: %s' % e)
314        except:
315            print('Error creating test channel (check argument).')
316        else:
317            test_channel_shell = TestChannelShell(test_channel)
318            test_channel_shell.prompt = '$ '
319            test_channel_shell.cmdloop('Welcome to the RootCanal Console \n' + 'Type \'help\' for more information.')
320
321
322if __name__ == '__main__':
323    main(sys.argv)
324