#!/usr/bin/env python3 # # Copyright 2017 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import logging import pandas as pd import time import acts_contrib.test_utils.bt.bt_test_utils as btutils from acts_contrib.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest from acts_contrib.test_utils.bt.ble_performance_test_utils import plot_graph from acts_contrib.test_utils.power.PowerBTBaseTest import ramp_attenuation from acts_contrib.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test from acts_contrib.test_utils.bt.bt_test_utils import orchestrate_rfcomm_connection from acts.signals import TestPass from acts import utils from acts_contrib.test_utils.bt.bt_test_utils import write_read_verify_data INIT_ATTEN = 0 WRITE_ITERATIONS = 500 class BtRfcommThroughputRangeTest(BluetoothBaseTest): def __init__(self, configs): super().__init__(configs) req_params = ['attenuation_vector', 'system_path_loss'] #'attenuation_vector' is a dict containing: start, stop and step of #attenuation changes self.unpack_userparams(req_params) def setup_class(self): super().setup_class() self.dut = self.android_devices[0] self.remote_device = self.android_devices[1] btutils.enable_bqr(self.android_devices) if hasattr(self, 'attenuators'): self.attenuator = self.attenuators[0] self.attenuator.set_atten(INIT_ATTEN) self.attenuation_range = range(self.attenuation_vector['start'], self.attenuation_vector['stop'] + 1, self.attenuation_vector['step']) self.log_path = os.path.join(logging.log_path, 'results') os.makedirs(self.log_path, exist_ok=True) return setup_multiple_devices_for_bt_test(self.android_devices) def teardown_test(self): self.dut.droid.bluetoothSocketConnStop() self.remote_device.droid.bluetoothSocketConnStop() if hasattr(self, 'attenuator'): self.attenuator.set_atten(INIT_ATTEN) def test_rfcomm_throughput_range(self): data_points = [] message = "x" * 990 self.file_output = os.path.join( self.log_path, '{}.csv'.format(self.current_test_name)) if not orchestrate_rfcomm_connection(self.dut, self.remote_device): return False self.log.info("RFCOMM Connection established") for atten in self.attenuation_range: ramp_attenuation(self.attenuator, atten) self.log.info('Set attenuation to %d dB', atten) process_data_dict = btutils.get_bt_metric(self.dut) rssi_primary = process_data_dict.get('rssi') pwlv_primary = process_data_dict.get('pwlv') rssi_primary = rssi_primary.get(self.dut.serial) pwlv_primary = pwlv_primary.get(self.dut.serial) self.log.info("DUT RSSI:{} and PwLv:{} with attenuation:{}".format( rssi_primary, pwlv_primary, atten)) if type(rssi_primary) != str: data_rate = self.write_read_verify_rfcommdata( self.dut, self.remote_device, message) data_point = { 'attenuation_db': atten, 'Dut_RSSI': rssi_primary, 'DUT_PwLv': pwlv_primary, 'Pathloss': atten + self.system_path_loss, 'RfcommThroughput': data_rate } data_points.append(data_point) df = pd.DataFrame(data_points) # bokeh data for generating BokehFigure bokeh_data = { 'x_label': 'Pathloss (dBm)', 'primary_y_label': 'RSSI (dBm)', 'log_path': self.log_path, 'current_test_name': self.current_test_name } # plot_data for adding line to existing BokehFigure plot_data = { 'line_one': { 'x_column': 'Pathloss', 'y_column': 'Dut_RSSI', 'legend': 'DUT RSSI (dBm)', 'marker': 'circle_x', 'y_axis': 'default' }, 'line_two': { 'x_column': 'Pathloss', 'y_column': 'RfcommThroughput', 'legend': 'RFCOMM Throughput (bits/sec)', 'marker': 'hex', 'y_axis': 'secondary' } } else: df.to_csv(self.file_output, index=False) plot_graph(df, plot_data, bokeh_data, secondary_y_label='RFCOMM Throughput (bits/sec)') raise TestPass("Reached RFCOMM Max Range,RFCOMM disconnected.") # Save Data points to csv df.to_csv(self.file_output, index=False) # Plot graph plot_graph(df, plot_data, bokeh_data, secondary_y_label='RFCOMM Throughput (bits/sec)') self.dut.droid.bluetoothRfcommStop() self.remote_device.droid.bluetoothRfcommStop() return True def write_read_verify_rfcommdata(self, dut, remote_device, msg): """Verify that the client wrote data to the remote Android device correctly. Args: dut: the Android device to perform the write. remote_device: the Android device to read the data written. msg: the message to write. Returns: True if the data written matches the data read, false if not. """ start_write_time = time.perf_counter() for n in range(WRITE_ITERATIONS): try: dut.droid.bluetoothSocketConnWrite(msg) except Exception as err: dut.log.error("Failed to write data: {}".format(err)) return False try: read_msg = remote_device.droid.bluetoothSocketConnRead() except Exception as err: remote_device.log.error("Failed to read data: {}".format(err)) return False if msg != read_msg: self.log.error("Mismatch! Read: {}, Expected: {}".format( read_msg, msg)) return False end_read_time = time.perf_counter() total_num_bytes = 990 * WRITE_ITERATIONS test_time = (end_read_time - start_write_time) if (test_time == 0): dut.log.error("Buffer transmits cannot take zero time") return 0 data_rate = (1.000 * total_num_bytes) / test_time self.log.info( "Calculated using total write and read times: total_num_bytes={}, " "test_time={}, data rate={:08.0f} bytes/sec, {:08.0f} bits/sec". format(total_num_bytes, test_time, data_rate, (data_rate * 8))) data_rate = data_rate * 8 return data_rate