• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright 2017 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import os
18import logging
19import pandas as pd
20import time
21import acts_contrib.test_utils.bt.bt_test_utils as btutils
22from acts_contrib.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
23from acts_contrib.test_utils.bt.ble_performance_test_utils import plot_graph
24from acts_contrib.test_utils.power.PowerBTBaseTest import ramp_attenuation
25from acts_contrib.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test
26from acts_contrib.test_utils.bt.bt_test_utils import orchestrate_rfcomm_connection
27from acts.signals import TestPass
28from acts import utils
29from acts_contrib.test_utils.bt.bt_test_utils import write_read_verify_data
30
31INIT_ATTEN = 0
32WRITE_ITERATIONS = 500
33
34
35class BtRfcommThroughputRangeTest(BluetoothBaseTest):
36    def __init__(self, configs):
37        super().__init__(configs)
38        req_params = ['attenuation_vector', 'system_path_loss']
39        #'attenuation_vector' is a dict containing: start, stop and step of
40        #attenuation changes
41        self.unpack_userparams(req_params)
42
43    def setup_class(self):
44        super().setup_class()
45        self.dut = self.android_devices[0]
46        self.remote_device = self.android_devices[1]
47        btutils.enable_bqr(self.android_devices)
48        if hasattr(self, 'attenuators'):
49            self.attenuator = self.attenuators[0]
50            self.attenuator.set_atten(INIT_ATTEN)
51        self.attenuation_range = range(self.attenuation_vector['start'],
52                                       self.attenuation_vector['stop'] + 1,
53                                       self.attenuation_vector['step'])
54        self.log_path = os.path.join(logging.log_path, 'results')
55        os.makedirs(self.log_path, exist_ok=True)
56        return setup_multiple_devices_for_bt_test(self.android_devices)
57
58    def teardown_test(self):
59        self.dut.droid.bluetoothSocketConnStop()
60        self.remote_device.droid.bluetoothSocketConnStop()
61        if hasattr(self, 'attenuator'):
62            self.attenuator.set_atten(INIT_ATTEN)
63
64    def test_rfcomm_throughput_range(self):
65        data_points = []
66        message = "x" * 990
67        self.file_output = os.path.join(
68            self.log_path, '{}.csv'.format(self.current_test_name))
69        if not orchestrate_rfcomm_connection(self.dut, self.remote_device):
70            return False
71        self.log.info("RFCOMM Connection established")
72        for atten in self.attenuation_range:
73            ramp_attenuation(self.attenuator, atten)
74            self.log.info('Set attenuation to %d dB', atten)
75            process_data_dict = btutils.get_bt_metric(self.dut)
76            rssi_primary = process_data_dict.get('rssi')
77            pwlv_primary = process_data_dict.get('pwlv')
78            rssi_primary = rssi_primary.get(self.dut.serial)
79            pwlv_primary = pwlv_primary.get(self.dut.serial)
80            self.log.info("DUT RSSI:{} and PwLv:{} with attenuation:{}".format(
81                rssi_primary, pwlv_primary, atten))
82            if type(rssi_primary) != str:
83                data_rate = self.write_read_verify_rfcommdata(
84                    self.dut, self.remote_device, message)
85                data_point = {
86                    'attenuation_db': atten,
87                    'Dut_RSSI': rssi_primary,
88                    'DUT_PwLv': pwlv_primary,
89                    'Pathloss': atten + self.system_path_loss,
90                    'RfcommThroughput': data_rate
91                }
92                data_points.append(data_point)
93                df = pd.DataFrame(data_points)
94                # bokeh data for generating BokehFigure
95                bokeh_data = {
96                    'x_label': 'Pathloss (dBm)',
97                    'primary_y_label': 'RSSI (dBm)',
98                    'log_path': self.log_path,
99                    'current_test_name': self.current_test_name
100                }
101                # plot_data for adding line to existing BokehFigure
102                plot_data = {
103                    'line_one': {
104                        'x_column': 'Pathloss',
105                        'y_column': 'Dut_RSSI',
106                        'legend': 'DUT RSSI (dBm)',
107                        'marker': 'circle_x',
108                        'y_axis': 'default'
109                    },
110                    'line_two': {
111                        'x_column': 'Pathloss',
112                        'y_column': 'RfcommThroughput',
113                        'legend': 'RFCOMM Throughput (bits/sec)',
114                        'marker': 'hex',
115                        'y_axis': 'secondary'
116                    }
117                }
118            else:
119                df.to_csv(self.file_output, index=False)
120                plot_graph(df,
121                           plot_data,
122                           bokeh_data,
123                           secondary_y_label='RFCOMM Throughput (bits/sec)')
124                raise TestPass("Reached RFCOMM Max Range,RFCOMM disconnected.")
125        # Save Data points to csv
126        df.to_csv(self.file_output, index=False)
127        # Plot graph
128        plot_graph(df,
129                   plot_data,
130                   bokeh_data,
131                   secondary_y_label='RFCOMM Throughput (bits/sec)')
132        self.dut.droid.bluetoothRfcommStop()
133        self.remote_device.droid.bluetoothRfcommStop()
134        return True
135
136    def write_read_verify_rfcommdata(self, dut, remote_device, msg):
137        """Verify that the client wrote data to the remote Android device correctly.
138
139        Args:
140            dut: the Android device to perform the write.
141            remote_device: the Android device to read the data written.
142            msg: the message to write.
143        Returns:
144            True if the data written matches the data read, false if not.
145        """
146        start_write_time = time.perf_counter()
147        for n in range(WRITE_ITERATIONS):
148            try:
149                dut.droid.bluetoothSocketConnWrite(msg)
150            except Exception as err:
151                dut.log.error("Failed to write data: {}".format(err))
152                return False
153            try:
154                read_msg = remote_device.droid.bluetoothSocketConnRead()
155            except Exception as err:
156                remote_device.log.error("Failed to read data: {}".format(err))
157                return False
158            if msg != read_msg:
159                self.log.error("Mismatch! Read: {}, Expected: {}".format(
160                    read_msg, msg))
161                return False
162        end_read_time = time.perf_counter()
163        total_num_bytes = 990 * WRITE_ITERATIONS
164        test_time = (end_read_time - start_write_time)
165        if (test_time == 0):
166            dut.log.error("Buffer transmits cannot take zero time")
167            return 0
168        data_rate = (1.000 * total_num_bytes) / test_time
169        self.log.info(
170            "Calculated using total write and read times: total_num_bytes={}, "
171            "test_time={}, data rate={:08.0f} bytes/sec, {:08.0f} bits/sec".
172            format(total_num_bytes, test_time, data_rate, (data_rate * 8)))
173        data_rate = data_rate * 8
174        return data_rate
175