#!/usr/bin/env python3 # # Copyright 2017 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Test script to execute BLE connection,run data traffic and calculating RSSI value of the remote BLE device. """ import os import logging import pandas as pd import numpy as np import time import acts_contrib.test_utils.bt.bt_test_utils as btutils import acts_contrib.test_utils.wifi.wifi_performance_test_utils.bokeh_figure as bokeh_figure from acts_contrib.test_utils.bt.ble_performance_test_utils import ble_coc_connection from acts_contrib.test_utils.bt.ble_performance_test_utils import ble_gatt_disconnection from acts_contrib.test_utils.bt.ble_performance_test_utils import start_advertising_and_scanning from acts_contrib.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest from acts_contrib.test_utils.bt.bt_test_utils import cleanup_scanners_and_advertisers from acts_contrib.test_utils.bt.ble_performance_test_utils import establish_ble_connection from acts_contrib.test_utils.bt.bt_constants import l2cap_max_inactivity_delay_after_disconnect from acts_contrib.test_utils.bt.ble_performance_test_utils import run_ble_throughput from acts_contrib.test_utils.bt.ble_performance_test_utils import read_ble_rssi from acts_contrib.test_utils.bt.ble_performance_test_utils import read_ble_scan_rssi from acts_contrib.test_utils.bt.bt_test_utils import reset_bluetooth from acts_contrib.test_utils.power.PowerBTBaseTest import ramp_attenuation from acts_contrib.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test from acts.signals import TestPass from acts import utils INIT_ATTEN = 0 MAX_RSSI = 92 class BleRangeTest(BluetoothBaseTest): active_adv_callback_list = [] active_scan_callback_list = [] def __init__(self, configs): super().__init__(configs) req_params = ['attenuation_vector', 'system_path_loss'] #'attenuation_vector' is a dict containing: start, stop and step of #attenuation changes self.unpack_userparams(req_params) def setup_class(self): super().setup_class() self.client_ad = self.android_devices[0] # The client which is scanning will need location to be enabled in order to # start scan and get scan results. utils.set_location_service(self.client_ad, True) self.server_ad = self.android_devices[1] # Note that some tests required a third device. if hasattr(self, 'attenuators'): self.attenuator = self.attenuators[0] self.attenuator.set_atten(INIT_ATTEN) self.attenuation_range = range(self.attenuation_vector['start'], self.attenuation_vector['stop'] + 1, self.attenuation_vector['step']) self.log_path = os.path.join(logging.log_path, 'results') os.makedirs(self.log_path, exist_ok=True) # BokehFigure object self.plot = bokeh_figure.BokehFigure( title='{}'.format(self.current_test_name), x_label='Pathloss (dB)', primary_y_label='BLE RSSI (dBm)', secondary_y_label='DUT Tx Power (dBm)', axis_label_size='16pt') if len(self.android_devices) > 2: self.server2_ad = self.android_devices[2] btutils.enable_bqr(self.android_devices) return setup_multiple_devices_for_bt_test(self.android_devices) def teardown_test(self): self.client_ad.droid.bluetoothSocketConnStop() self.server_ad.droid.bluetoothSocketConnStop() if hasattr(self, 'attenuator'): self.attenuator.set_atten(INIT_ATTEN) # Give sufficient time for the physical LE link to be disconnected. time.sleep(l2cap_max_inactivity_delay_after_disconnect) cleanup_scanners_and_advertisers(self.client_ad, self.active_scan_callback_list, self.server_ad, self.active_adv_callback_list) def test_ble_gatt_connection_range(self): """Test GATT connection over LE and read RSSI. Test will establish a gatt connection between a GATT server and GATT client then read the RSSI for each attenuation until the BLE link get disconnect Expected Result: Verify that a connection was established and then disconnected successfully. Verify that the RSSI was read correctly. """ attenuation = [] ble_rssi = [] dut_pwlv = [] path_loss = [] bluetooth_gatt, gatt_callback, adv_callback, gatt_server = establish_ble_connection( self.client_ad, self.server_ad) for atten in self.attenuation_range: ramp_attenuation(self.attenuator, atten) self.log.info('Set attenuation to %d dB', atten) rssi_primary, pwlv_primary = self.get_ble_rssi_and_pwlv() self.log.info( "Dut BLE RSSI:{} and Pwlv:{} with attenuation:{}".format( rssi_primary, pwlv_primary, atten)) rssi = self.client_ad.droid.gattClientReadRSSI(gatt_server) if type(rssi_primary) != str: attenuation.append(atten) ble_rssi.append(rssi_primary) dut_pwlv.append(pwlv_primary) path_loss.append(atten + self.system_path_loss) df = pd.DataFrame({ 'Attenuation': attenuation, 'BLE_RSSI': ble_rssi, 'Dut_PwLv': dut_pwlv, 'Pathloss': path_loss }) filepath = os.path.join( self.log_path, '{}.csv'.format(self.current_test_name)) else: self.plot_ble_graph(df) df.to_csv(filepath, encoding='utf-8') raise TestPass('Reached BLE Max Range, BLE Gatt disconnected') ble_gatt_disconnection(self.client_ad, bluetooth_gatt, gatt_callback) self.plot_ble_graph(df) df.to_csv(filepath, encoding='utf-8') self.server_ad.droid.bleStopBleAdvertising(adv_callback) return True def test_ble_coc_throughput_range(self): """Test LE CoC data transfer and read RSSI with each attenuation Test will establish a L2CAP CoC connection between client and server then start BLE date transfer and read the RSSI for each attenuation until the BLE link get disconnect Expected Result: BLE data transfer successful and Read RSSi Value of the server """ attenuation = [] ble_rssi = [] throughput = [] dut_pwlv = [] path_loss = [] self.plot_throughput = bokeh_figure.BokehFigure( title='{}'.format(self.current_test_name), x_label='Pathloss (dB)', primary_y_label='BLE Throughput (bits per sec)', axis_label_size='16pt') status, gatt_callback, gatt_server, bluetooth_gatt, client_conn_id = ble_coc_connection( self.server_ad, self.client_ad) for atten in self.attenuation_range: ramp_attenuation(self.attenuator, atten) self.log.info('Set attenuation to %d dB', atten) datarate = run_ble_throughput(self.client_ad, client_conn_id, self.server_ad) rssi_primary, pwlv_primary = self.get_ble_rssi_and_pwlv() self.log.info( "BLE RSSI is:{} dBm and Tx Power:{} with attenuation {} dB with throughput:{}bits per sec" .format(rssi_primary, pwlv_primary, atten, datarate)) if type(rssi_primary) != str: attenuation.append(atten) ble_rssi.append(rssi_primary) dut_pwlv.append(pwlv_primary) throughput.append(datarate) path_loss.append(atten + self.system_path_loss) df = pd.DataFrame({ 'Attenuation': attenuation, 'BLE_RSSI': ble_rssi, 'Dut_PwLv': dut_pwlv, 'Throughput': throughput, 'Pathloss': path_loss }) filepath = os.path.join( self.log_path, '{}.csv'.format(self.current_test_name)) results_file_path = os.path.join( self.log_path, '{}_throughput.html'.format(self.current_test_name)) self.plot_throughput.add_line(df['Pathloss'], df['Throughput'], legend='BLE Throughput', marker='square_x') else: self.plot_ble_graph(df) self.plot_throughput.generate_figure() bokeh_figure.BokehFigure.save_figures([self.plot_throughput], results_file_path) df.to_csv(filepath, encoding='utf-8') raise TestPass('Reached BLE Max Range, BLE Gatt disconnected') self.plot_ble_graph(df) self.plot_throughput.generate_figure() bokeh_figure.BokehFigure.save_figures([self.plot_throughput], results_file_path) df.to_csv(filepath, encoding='utf-8') ble_gatt_disconnection(self.server_ad, bluetooth_gatt, gatt_callback) return True def test_ble_scan_remote_rssi(self): data_points = [] for atten in self.attenuation_range: csv_path = os.path.join( self.log_path, '{}_attenuation_{}.csv'.format(self.current_test_name, atten)) ramp_attenuation(self.attenuator, atten) self.log.info('Set attenuation to %d dB', atten) adv_callback, scan_callback = start_advertising_and_scanning( self.client_ad, self.server_ad, Legacymode=False) self.active_adv_callback_list.append(adv_callback) self.active_scan_callback_list.append(scan_callback) average_rssi, raw_rssi, timestamp = read_ble_scan_rssi( self.client_ad, scan_callback) self.log.info( "Scanned rssi list of the remote device is :{}".format( raw_rssi)) self.log.info( "BLE RSSI of the remote device is:{} dBm".format(average_rssi)) min_rssi = min(raw_rssi) max_rssi = max(raw_rssi) path_loss = atten + self.system_path_loss std_deviation = np.std(raw_rssi) data_point = { 'Attenuation': atten, 'BLE_RSSI': average_rssi, 'Pathloss': path_loss, 'Min_RSSI': min_rssi, 'Max_RSSI': max_rssi, 'Standard_deviation': std_deviation } data_points.append(data_point) df = pd.DataFrame({'timestamp': timestamp, 'raw rssi': raw_rssi}) df.to_csv(csv_path, encoding='utf-8', index=False) try: self.server_ad.droid.bleAdvSetStopAdvertisingSet(adv_callback) except Exception as err: self.log.warning( "Failed to stop advertisement: {}".format(err)) reset_bluetooth([self.server_ad]) self.client_ad.droid.bleStopBleScan(scan_callback) filepath = os.path.join( self.log_path, '{}_summary.csv'.format(self.current_test_name)) ble_df = pd.DataFrame(data_points) ble_df.to_csv(filepath, encoding='utf-8') return True def plot_ble_graph(self, df): """ Plotting BLE RSSI and Throughput with Attenuation. Args: df: Summary of results contains attenuation, BLE_RSSI and Throughput """ self.plot.add_line(df['Pathloss'], df['BLE_RSSI'], legend='DUT BLE RSSI (dBm)', marker='circle_x') self.plot.add_line(df['Pathloss'], df['Dut_PwLv'], legend='DUT TX Power (dBm)', marker='hex', y_axis='secondary') results_file_path = os.path.join( self.log_path, '{}.html'.format(self.current_test_name)) self.plot.generate_figure() bokeh_figure.BokehFigure.save_figures([self.plot], results_file_path) def get_ble_rssi_and_pwlv(self): process_data_dict = btutils.get_bt_metric(self.client_ad) rssi_primary = process_data_dict.get('rssi') pwlv_primary = process_data_dict.get('pwlv') rssi_primary = rssi_primary.get(self.client_ad.serial) pwlv_primary = pwlv_primary.get(self.client_ad.serial) return rssi_primary, pwlv_primary