#!/usr/bin/env python3.4 # # Copyright 2022 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the 'License'); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an 'AS IS' BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import collections import csv import itertools import numpy import json import os from acts import context from acts import base_test from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger from acts_contrib.test_utils.cellular.performance import cellular_performance_test_utils as cputils from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils from acts_contrib.test_utils.wifi.wifi_performance_test_utils.bokeh_figure import BokehFigure from acts_contrib.test_utils.cellular.performance.CellularThroughputBaseTest import CellularThroughputBaseTest from functools import partial class CellularFr2SensitivityTest(CellularThroughputBaseTest): """Class to test single cell FR1 NSA sensitivity""" def __init__(self, controllers): base_test.BaseTestClass.__init__(self, controllers) self.testcase_metric_logger = ( BlackboxMappedMetricLogger.for_test_case()) self.testclass_metric_logger = ( BlackboxMappedMetricLogger.for_test_class()) self.publish_testcase_metrics = True self.testclass_params = self.user_params['fr2_sensitivity_test_params'] self.log.info('Hello') self.tests = self.generate_test_cases( band_list=['N257', 'N258', 'N260', 'N261'], channel_list=['low', 'mid', 'high'], dl_mcs_list=list(numpy.arange(28, -1, -1)), num_dl_cells_list=[1, 2, 4, 8], orientation_list=['A_Plane', 'B_Plane'], dl_mimo_config=2, nr_ul_mcs=4, lte_dl_mcs_table='QAM256', lte_dl_mcs=4, lte_ul_mcs_table='QAM256', lte_ul_mcs=4, schedule_scenario="FULL_TPUT", schedule_slot_ratio=80, force_contiguous_nr_channel=True, transform_precoding=0, nr_dl_mcs_table='Q256', nr_ul_mcs_table='Q64') def process_testclass_results(self): # Plot individual test id results raw data and compile metrics plots = collections.OrderedDict() compiled_data = collections.OrderedDict() for testcase_name, testcase_data in self.testclass_results.items(): nr_cell_index = testcase_data['testcase_params'][ 'endc_combo_config']['lte_cell_count'] cell_config = testcase_data['testcase_params'][ 'endc_combo_config']['cell_list'][nr_cell_index] test_id = tuple(('band', cell_config['band'])) if test_id not in plots: # Initialize test id data when not present compiled_data[test_id] = { 'mcs': [], 'average_throughput': [], 'theoretical_throughput': [], 'cell_power': [], } plots[test_id] = BokehFigure( title='Band {} - BLER Curves'.format(cell_config['band']), x_label='Cell Power (dBm)', primary_y_label='BLER (Mbps)') test_id_rvr = test_id + tuple('RvR') plots[test_id_rvr] = BokehFigure( title='Band {} - RvR'.format(cell_config['band']), x_label='Cell Power (dBm)', primary_y_label='PHY Rate (Mbps)') # Compile test id data and metrics compiled_data[test_id]['average_throughput'].append( testcase_data['average_throughput_list']) compiled_data[test_id]['cell_power'].append( testcase_data['cell_power_list']) compiled_data[test_id]['mcs'].append( testcase_data['testcase_params']['nr_dl_mcs']) # Add test id to plots plots[test_id].add_line( testcase_data['cell_power_list'], testcase_data['bler_list'], 'MCS {}'.format(testcase_data['testcase_params']['nr_dl_mcs']), width=1) plots[test_id_rvr].add_line( testcase_data['cell_power_list'], testcase_data['average_throughput_list'], 'MCS {}'.format(testcase_data['testcase_params']['nr_dl_mcs']), width=1, style='dashed') for test_id, test_data in compiled_data.items(): test_id_rvr = test_id + tuple('RvR') cell_power_interp = sorted(set(sum(test_data['cell_power'], []))) average_throughput_interp = [] for mcs, cell_power, throughput in zip( test_data['mcs'], test_data['cell_power'], test_data['average_throughput']): throughput_interp = numpy.interp(cell_power_interp, cell_power[::-1], throughput[::-1]) average_throughput_interp.append(throughput_interp) rvr = numpy.max(average_throughput_interp, 0) plots[test_id_rvr].add_line(cell_power_interp, rvr, 'Rate vs. Range') figure_list = [] for plot_id, plot in plots.items(): plot.generate_figure() figure_list.append(plot) output_file_path = os.path.join(self.log_path, 'results.html') BokehFigure.save_figures(figure_list, output_file_path) """Saves CSV with all test results to enable comparison.""" results_file_path = os.path.join( context.get_current_context().get_full_output_path(), 'results.csv') with open(results_file_path, 'w', newline='') as csvfile: field_names = ['Test Name', 'Sensitivity'] writer = csv.DictWriter(csvfile, fieldnames=field_names) writer.writeheader() for testcase_name, testcase_results in self.testclass_results.items( ): row_dict = { 'Test Name': testcase_name, 'Sensitivity': testcase_results['sensitivity'] } writer.writerow(row_dict) def process_testcase_results(self): if self.current_test_name not in self.testclass_results: return testcase_data = self.testclass_results[self.current_test_name] bler_list = [] average_throughput_list = [] theoretical_throughput_list = [] nr_cell_index = testcase_data['testcase_params']['endc_combo_config'][ 'lte_cell_count'] cell_power_list = testcase_data['testcase_params']['cell_power_sweep'][ nr_cell_index] for result in testcase_data['results']: bler_list.append(result['throughput_measurements'] ['nr_bler_result']['total']['DL']['nack_ratio']) average_throughput_list.append( result['throughput_measurements']['nr_tput_result']['total'] ['DL']['average_tput']) theoretical_throughput_list.append( result['throughput_measurements']['nr_tput_result']['total'] ['DL']['theoretical_tput']) padding_len = len(cell_power_list) - len(average_throughput_list) average_throughput_list.extend([0] * padding_len) theoretical_throughput_list.extend([0] * padding_len) bler_above_threshold = [ bler > self.testclass_params['bler_threshold'] for bler in bler_list ] for idx in range(len(bler_above_threshold)): if all(bler_above_threshold[idx:]): sensitivity_idx = max(idx, 1) - 1 break else: sensitivity_idx = -1 sensitivity = cell_power_list[sensitivity_idx] self.log.info('NR Band {} MCS {} Sensitivity = {}dBm'.format( testcase_data['testcase_params']['endc_combo_config']['cell_list'] [nr_cell_index]['band'], testcase_data['testcase_params']['nr_dl_mcs'], sensitivity)) testcase_data['bler_list'] = bler_list testcase_data['average_throughput_list'] = average_throughput_list testcase_data[ 'theoretical_throughput_list'] = theoretical_throughput_list testcase_data['cell_power_list'] = cell_power_list testcase_data['sensitivity'] = sensitivity results_file_path = os.path.join( context.get_current_context().get_full_output_path(), '{}.json'.format(self.current_test_name)) with open(results_file_path, 'w') as results_file: json.dump(wputils.serialize_dict(testcase_data), results_file, indent=4) def get_per_cell_power_sweeps(self, testcase_params): # get reference test current_band = testcase_params['endc_combo_config']['cell_list'][1][ 'band'] reference_test = None reference_sensitivity = None for testcase_name, testcase_data in self.testclass_results.items(): if testcase_data['testcase_params']['endc_combo_config'][ 'cell_list'][1]['band'] == current_band: reference_test = testcase_name reference_sensitivity = testcase_data['sensitivity'] if reference_test and reference_sensitivity and not self.retry_flag: start_atten = reference_sensitivity + self.testclass_params[ 'adjacent_mcs_gap'] self.log.info( "Reference test {} found. Sensitivity {} dBm. Starting at {} dBm" .format(reference_test, reference_sensitivity, start_atten)) else: start_atten = self.testclass_params['nr_cell_power_start'] self.log.info( "Reference test not found. Starting at {} dBm".format( start_atten)) # get current cell power start nr_cell_sweep = list( numpy.arange(start_atten, self.testclass_params['nr_cell_power_stop'], self.testclass_params['nr_cell_power_step'])) lte_sweep = [self.testclass_params['lte_cell_power'] ] * len(nr_cell_sweep) cell_power_sweeps = [lte_sweep] cell_power_sweeps.extend( [nr_cell_sweep] * testcase_params['endc_combo_config']['nr_cell_count']) return cell_power_sweeps def generate_endc_combo_config(self, test_config): """Function to generate ENDC combo config from CSV test config Args: test_config: dict containing ENDC combo config from CSV Returns: endc_combo_config: dictionary with all ENDC combo settings """ endc_combo_config = collections.OrderedDict() cell_config_list = [] lte_cell_count = 1 lte_carriers = [1] lte_scc_list = [] endc_combo_config['lte_pcc'] = 1 lte_cell = { 'cell_type': 'LTE', 'cell_number': 1, 'pcc': 1, 'band': self.testclass_params['lte_anchor_band'], 'dl_bandwidth': self.testclass_params['lte_anchor_bandwidth'], 'ul_enabled': 1, 'duplex_mode': self.testclass_params['lte_anchor_duplex_mode'], 'dl_mimo_config': 'D{nss}U{nss}'.format(nss=1), 'ul_mimo_config': 'D{nss}U{nss}'.format(nss=1), 'transmission_mode': 'TM1', 'num_codewords': 1, 'num_layers': 1, 'dl_subframe_allocation': [1] * 10, } cell_config_list.append(lte_cell) nr_cell_count = 0 nr_dl_carriers = [] nr_ul_carriers = [] for nr_cell_idx in range(1, test_config['num_dl_cells'] + 1): nr_cell = { 'cell_type': 'NR5G', 'cell_number': nr_cell_idx, 'nr_cell_type': 'NSA', 'band': test_config['nr_band'], 'duplex_mode': test_config['nr_duplex_mode'], 'channel': test_config['nr_channel'], 'dl_mimo_config': 'N{nss}X{nss}'.format(nss=test_config['nr_dl_mimo_config']), 'dl_bandwidth_class': 'A', 'dl_bandwidth': test_config['nr_bandwidth'], 'ul_enabled': 1 if nr_cell_idx <= test_config['num_ul_cells'] else 0, 'ul_bandwidth_class': 'A', 'ul_mimo_config': 'N{nss}X{nss}'.format(nss=test_config['nr_ul_mimo_config']), 'subcarrier_spacing': 'MU3' } cell_config_list.append(nr_cell) nr_cell_count = nr_cell_count + 1 nr_dl_carriers.append(nr_cell_idx) if nr_cell_idx <= test_config['num_ul_cells']: nr_ul_carriers.append(nr_cell_idx) endc_combo_config['lte_cell_count'] = lte_cell_count endc_combo_config['nr_cell_count'] = nr_cell_count endc_combo_config['nr_dl_carriers'] = nr_dl_carriers endc_combo_config['nr_ul_carriers'] = nr_ul_carriers endc_combo_config['cell_list'] = cell_config_list endc_combo_config['lte_scc_list'] = lte_scc_list endc_combo_config['lte_dl_carriers'] = lte_carriers endc_combo_config['lte_ul_carriers'] = lte_carriers return endc_combo_config def generate_test_cases(self, band_list, channel_list, dl_mcs_list, num_dl_cells_list, dl_mimo_config, orientation_list, **kwargs): """Function that auto-generates test cases for a test class.""" test_cases = [] for orientation, band, channel, num_dl_cells, nr_dl_mcs in itertools.product( orientation_list, band_list, channel_list, num_dl_cells_list, dl_mcs_list): if channel not in cputils.PCC_PRESET_MAPPING[band]: continue test_config = { 'nr_band': band, 'nr_bandwidth': 'BW100', 'nr_duplex_mode': 'TDD', 'nr_channel': channel, 'num_dl_cells': num_dl_cells, 'num_ul_cells': 1, 'nr_dl_mimo_config': dl_mimo_config, 'nr_ul_mimo_config': 1 } endc_combo_config = self.generate_endc_combo_config(test_config) test_name = 'test_fr2_{}_{}_{}CC_mcs{}_{}x{}'.format( band, channel.lower(), num_dl_cells, nr_dl_mcs, dl_mimo_config, dl_mimo_config) test_params = collections.OrderedDict( endc_combo_config=endc_combo_config, nr_dl_mcs=nr_dl_mcs, orientation=orientation, **kwargs) setattr(self, test_name, partial(self._test_throughput_bler, test_params)) test_cases.append(test_name) return test_cases