1#!/usr/bin/env python3 2# 3# Copyright 2017 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import os 18import logging 19import pandas as pd 20import time 21import acts_contrib.test_utils.bt.bt_test_utils as btutils 22from acts_contrib.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest 23from acts_contrib.test_utils.bt.ble_performance_test_utils import plot_graph 24from acts_contrib.test_utils.power.PowerBTBaseTest import ramp_attenuation 25from acts_contrib.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test 26from acts_contrib.test_utils.bt.bt_test_utils import orchestrate_rfcomm_connection 27from acts.signals import TestPass 28from acts import utils 29from acts_contrib.test_utils.bt.bt_test_utils import write_read_verify_data 30 31INIT_ATTEN = 0 32WRITE_ITERATIONS = 500 33 34 35class BtRfcommThroughputRangeTest(BluetoothBaseTest): 36 def __init__(self, configs): 37 super().__init__(configs) 38 req_params = ['attenuation_vector', 'system_path_loss'] 39 #'attenuation_vector' is a dict containing: start, stop and step of 40 #attenuation changes 41 self.unpack_userparams(req_params) 42 43 def setup_class(self): 44 super().setup_class() 45 self.dut = self.android_devices[0] 46 self.remote_device = self.android_devices[1] 47 btutils.enable_bqr(self.android_devices) 48 if hasattr(self, 'attenuators'): 49 self.attenuator = self.attenuators[0] 50 self.attenuator.set_atten(INIT_ATTEN) 51 self.attenuation_range = range(self.attenuation_vector['start'], 52 self.attenuation_vector['stop'] + 1, 53 self.attenuation_vector['step']) 54 self.log_path = os.path.join(logging.log_path, 'results') 55 os.makedirs(self.log_path, exist_ok=True) 56 return setup_multiple_devices_for_bt_test(self.android_devices) 57 58 def teardown_test(self): 59 self.dut.droid.bluetoothSocketConnStop() 60 self.remote_device.droid.bluetoothSocketConnStop() 61 if hasattr(self, 'attenuator'): 62 self.attenuator.set_atten(INIT_ATTEN) 63 64 def test_rfcomm_throughput_range(self): 65 data_points = [] 66 message = "x" * 990 67 self.file_output = os.path.join( 68 self.log_path, '{}.csv'.format(self.current_test_name)) 69 if not orchestrate_rfcomm_connection(self.dut, self.remote_device): 70 return False 71 self.log.info("RFCOMM Connection established") 72 for atten in self.attenuation_range: 73 ramp_attenuation(self.attenuator, atten) 74 self.log.info('Set attenuation to %d dB', atten) 75 process_data_dict = btutils.get_bt_metric(self.dut) 76 rssi_primary = process_data_dict.get('rssi') 77 pwlv_primary = process_data_dict.get('pwlv') 78 rssi_primary = rssi_primary.get(self.dut.serial) 79 pwlv_primary = pwlv_primary.get(self.dut.serial) 80 self.log.info("DUT RSSI:{} and PwLv:{} with attenuation:{}".format( 81 rssi_primary, pwlv_primary, atten)) 82 if type(rssi_primary) != str: 83 data_rate = self.write_read_verify_rfcommdata( 84 self.dut, self.remote_device, message) 85 data_point = { 86 'attenuation_db': atten, 87 'Dut_RSSI': rssi_primary, 88 'DUT_PwLv': pwlv_primary, 89 'Pathloss': atten + self.system_path_loss, 90 'RfcommThroughput': data_rate 91 } 92 data_points.append(data_point) 93 df = pd.DataFrame(data_points) 94 # bokeh data for generating BokehFigure 95 bokeh_data = { 96 'x_label': 'Pathloss (dBm)', 97 'primary_y_label': 'RSSI (dBm)', 98 'log_path': self.log_path, 99 'current_test_name': self.current_test_name 100 } 101 # plot_data for adding line to existing BokehFigure 102 plot_data = { 103 'line_one': { 104 'x_column': 'Pathloss', 105 'y_column': 'Dut_RSSI', 106 'legend': 'DUT RSSI (dBm)', 107 'marker': 'circle_x', 108 'y_axis': 'default' 109 }, 110 'line_two': { 111 'x_column': 'Pathloss', 112 'y_column': 'RfcommThroughput', 113 'legend': 'RFCOMM Throughput (bits/sec)', 114 'marker': 'hex', 115 'y_axis': 'secondary' 116 } 117 } 118 else: 119 df.to_csv(self.file_output, index=False) 120 plot_graph(df, 121 plot_data, 122 bokeh_data, 123 secondary_y_label='RFCOMM Throughput (bits/sec)') 124 raise TestPass("Reached RFCOMM Max Range,RFCOMM disconnected.") 125 # Save Data points to csv 126 df.to_csv(self.file_output, index=False) 127 # Plot graph 128 plot_graph(df, 129 plot_data, 130 bokeh_data, 131 secondary_y_label='RFCOMM Throughput (bits/sec)') 132 self.dut.droid.bluetoothRfcommStop() 133 self.remote_device.droid.bluetoothRfcommStop() 134 return True 135 136 def write_read_verify_rfcommdata(self, dut, remote_device, msg): 137 """Verify that the client wrote data to the remote Android device correctly. 138 139 Args: 140 dut: the Android device to perform the write. 141 remote_device: the Android device to read the data written. 142 msg: the message to write. 143 Returns: 144 True if the data written matches the data read, false if not. 145 """ 146 start_write_time = time.perf_counter() 147 for n in range(WRITE_ITERATIONS): 148 try: 149 dut.droid.bluetoothSocketConnWrite(msg) 150 except Exception as err: 151 dut.log.error("Failed to write data: {}".format(err)) 152 return False 153 try: 154 read_msg = remote_device.droid.bluetoothSocketConnRead() 155 except Exception as err: 156 remote_device.log.error("Failed to read data: {}".format(err)) 157 return False 158 if msg != read_msg: 159 self.log.error("Mismatch! Read: {}, Expected: {}".format( 160 read_msg, msg)) 161 return False 162 end_read_time = time.perf_counter() 163 total_num_bytes = 990 * WRITE_ITERATIONS 164 test_time = (end_read_time - start_write_time) 165 if (test_time == 0): 166 dut.log.error("Buffer transmits cannot take zero time") 167 return 0 168 data_rate = (1.000 * total_num_bytes) / test_time 169 self.log.info( 170 "Calculated using total write and read times: total_num_bytes={}, " 171 "test_time={}, data rate={:08.0f} bytes/sec, {:08.0f} bits/sec". 172 format(total_num_bytes, test_time, data_rate, (data_rate * 8))) 173 data_rate = data_rate * 8 174 return data_rate 175