1#!/usr/bin/env python3.4 2# 3# Copyright 2022 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import collections 18import csv 19import itertools 20import numpy 21import json 22import os 23from acts import context 24from acts import base_test 25from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger 26from acts_contrib.test_utils.cellular.performance import cellular_performance_test_utils as cputils 27from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils 28from acts_contrib.test_utils.wifi.wifi_performance_test_utils.bokeh_figure import BokehFigure 29from acts_contrib.test_utils.cellular.performance.CellularThroughputBaseTest import CellularThroughputBaseTest 30 31from functools import partial 32 33 34class CellularFr2SensitivityTest(CellularThroughputBaseTest): 35 """Class to test single cell FR1 NSA sensitivity""" 36 37 def __init__(self, controllers): 38 base_test.BaseTestClass.__init__(self, controllers) 39 self.testcase_metric_logger = ( 40 BlackboxMappedMetricLogger.for_test_case()) 41 self.testclass_metric_logger = ( 42 BlackboxMappedMetricLogger.for_test_class()) 43 self.publish_testcase_metrics = True 44 self.testclass_params = self.user_params['fr2_sensitivity_test_params'] 45 self.log.info('Hello') 46 self.tests = self.generate_test_cases( 47 band_list=['N257', 'N258', 'N260', 'N261'], 48 channel_list=['low', 'mid', 'high'], 49 dl_mcs_list=list(numpy.arange(28, -1, -1)), 50 num_dl_cells_list=[1, 2, 4, 8], 51 orientation_list=['A_Plane', 'B_Plane'], 52 dl_mimo_config=2, 53 nr_ul_mcs=4, 54 lte_dl_mcs_table='QAM256', 55 lte_dl_mcs=4, 56 lte_ul_mcs_table='QAM256', 57 lte_ul_mcs=4, 58 schedule_scenario="FULL_TPUT", 59 schedule_slot_ratio=80, 60 force_contiguous_nr_channel=True, 61 transform_precoding=0, 62 nr_dl_mcs_table='Q256', 63 nr_ul_mcs_table='Q64') 64 65 def process_testclass_results(self): 66 # Plot individual test id results raw data and compile metrics 67 plots = collections.OrderedDict() 68 compiled_data = collections.OrderedDict() 69 for testcase_name, testcase_data in self.testclass_results.items(): 70 nr_cell_index = testcase_data['testcase_params'][ 71 'endc_combo_config']['lte_cell_count'] 72 cell_config = testcase_data['testcase_params'][ 73 'endc_combo_config']['cell_list'][nr_cell_index] 74 test_id = tuple(('band', cell_config['band'])) 75 if test_id not in plots: 76 # Initialize test id data when not present 77 compiled_data[test_id] = { 78 'mcs': [], 79 'average_throughput': [], 80 'theoretical_throughput': [], 81 'cell_power': [], 82 } 83 plots[test_id] = BokehFigure( 84 title='Band {} - BLER Curves'.format(cell_config['band']), 85 x_label='Cell Power (dBm)', 86 primary_y_label='BLER (Mbps)') 87 test_id_rvr = test_id + tuple('RvR') 88 plots[test_id_rvr] = BokehFigure( 89 title='Band {} - RvR'.format(cell_config['band']), 90 x_label='Cell Power (dBm)', 91 primary_y_label='PHY Rate (Mbps)') 92 # Compile test id data and metrics 93 compiled_data[test_id]['average_throughput'].append( 94 testcase_data['average_throughput_list']) 95 compiled_data[test_id]['cell_power'].append( 96 testcase_data['cell_power_list']) 97 compiled_data[test_id]['mcs'].append( 98 testcase_data['testcase_params']['nr_dl_mcs']) 99 # Add test id to plots 100 plots[test_id].add_line( 101 testcase_data['cell_power_list'], 102 testcase_data['bler_list'], 103 'MCS {}'.format(testcase_data['testcase_params']['nr_dl_mcs']), 104 width=1) 105 plots[test_id_rvr].add_line( 106 testcase_data['cell_power_list'], 107 testcase_data['average_throughput_list'], 108 'MCS {}'.format(testcase_data['testcase_params']['nr_dl_mcs']), 109 width=1, 110 style='dashed') 111 112 for test_id, test_data in compiled_data.items(): 113 test_id_rvr = test_id + tuple('RvR') 114 cell_power_interp = sorted(set(sum(test_data['cell_power'], []))) 115 average_throughput_interp = [] 116 for mcs, cell_power, throughput in zip( 117 test_data['mcs'], test_data['cell_power'], 118 test_data['average_throughput']): 119 throughput_interp = numpy.interp(cell_power_interp, 120 cell_power[::-1], 121 throughput[::-1]) 122 average_throughput_interp.append(throughput_interp) 123 rvr = numpy.max(average_throughput_interp, 0) 124 plots[test_id_rvr].add_line(cell_power_interp, rvr, 125 'Rate vs. Range') 126 127 figure_list = [] 128 for plot_id, plot in plots.items(): 129 plot.generate_figure() 130 figure_list.append(plot) 131 output_file_path = os.path.join(self.log_path, 'results.html') 132 BokehFigure.save_figures(figure_list, output_file_path) 133 """Saves CSV with all test results to enable comparison.""" 134 results_file_path = os.path.join( 135 context.get_current_context().get_full_output_path(), 136 'results.csv') 137 with open(results_file_path, 'w', newline='') as csvfile: 138 field_names = ['Test Name', 'Sensitivity'] 139 writer = csv.DictWriter(csvfile, fieldnames=field_names) 140 writer.writeheader() 141 142 for testcase_name, testcase_results in self.testclass_results.items( 143 ): 144 row_dict = { 145 'Test Name': testcase_name, 146 'Sensitivity': testcase_results['sensitivity'] 147 } 148 writer.writerow(row_dict) 149 150 def process_testcase_results(self): 151 if self.current_test_name not in self.testclass_results: 152 return 153 testcase_data = self.testclass_results[self.current_test_name] 154 155 bler_list = [] 156 average_throughput_list = [] 157 theoretical_throughput_list = [] 158 nr_cell_index = testcase_data['testcase_params']['endc_combo_config'][ 159 'lte_cell_count'] 160 cell_power_list = testcase_data['testcase_params']['cell_power_sweep'][ 161 nr_cell_index] 162 for result in testcase_data['results']: 163 bler_list.append(result['throughput_measurements'] 164 ['nr_bler_result']['total']['DL']['nack_ratio']) 165 average_throughput_list.append( 166 result['throughput_measurements']['nr_tput_result']['total'] 167 ['DL']['average_tput']) 168 theoretical_throughput_list.append( 169 result['throughput_measurements']['nr_tput_result']['total'] 170 ['DL']['theoretical_tput']) 171 padding_len = len(cell_power_list) - len(average_throughput_list) 172 average_throughput_list.extend([0] * padding_len) 173 theoretical_throughput_list.extend([0] * padding_len) 174 175 bler_above_threshold = [ 176 bler > self.testclass_params['bler_threshold'] 177 for bler in bler_list 178 ] 179 for idx in range(len(bler_above_threshold)): 180 if all(bler_above_threshold[idx:]): 181 sensitivity_idx = max(idx, 1) - 1 182 break 183 else: 184 sensitivity_idx = -1 185 sensitivity = cell_power_list[sensitivity_idx] 186 self.log.info('NR Band {} MCS {} Sensitivity = {}dBm'.format( 187 testcase_data['testcase_params']['endc_combo_config']['cell_list'] 188 [nr_cell_index]['band'], 189 testcase_data['testcase_params']['nr_dl_mcs'], sensitivity)) 190 191 testcase_data['bler_list'] = bler_list 192 testcase_data['average_throughput_list'] = average_throughput_list 193 testcase_data[ 194 'theoretical_throughput_list'] = theoretical_throughput_list 195 testcase_data['cell_power_list'] = cell_power_list 196 testcase_data['sensitivity'] = sensitivity 197 198 results_file_path = os.path.join( 199 context.get_current_context().get_full_output_path(), 200 '{}.json'.format(self.current_test_name)) 201 with open(results_file_path, 'w') as results_file: 202 json.dump(wputils.serialize_dict(testcase_data), 203 results_file, 204 indent=4) 205 206 def get_per_cell_power_sweeps(self, testcase_params): 207 # get reference test 208 current_band = testcase_params['endc_combo_config']['cell_list'][1][ 209 'band'] 210 reference_test = None 211 reference_sensitivity = None 212 for testcase_name, testcase_data in self.testclass_results.items(): 213 if testcase_data['testcase_params']['endc_combo_config'][ 214 'cell_list'][1]['band'] == current_band: 215 reference_test = testcase_name 216 reference_sensitivity = testcase_data['sensitivity'] 217 if reference_test and reference_sensitivity and not self.retry_flag: 218 start_atten = reference_sensitivity + self.testclass_params[ 219 'adjacent_mcs_gap'] 220 self.log.info( 221 "Reference test {} found. Sensitivity {} dBm. Starting at {} dBm" 222 .format(reference_test, reference_sensitivity, start_atten)) 223 else: 224 start_atten = self.testclass_params['nr_cell_power_start'] 225 self.log.info( 226 "Reference test not found. Starting at {} dBm".format( 227 start_atten)) 228 # get current cell power start 229 nr_cell_sweep = list( 230 numpy.arange(start_atten, 231 self.testclass_params['nr_cell_power_stop'], 232 self.testclass_params['nr_cell_power_step'])) 233 lte_sweep = [self.testclass_params['lte_cell_power'] 234 ] * len(nr_cell_sweep) 235 cell_power_sweeps = [lte_sweep] 236 cell_power_sweeps.extend( 237 [nr_cell_sweep] * 238 testcase_params['endc_combo_config']['nr_cell_count']) 239 return cell_power_sweeps 240 241 def generate_endc_combo_config(self, test_config): 242 """Function to generate ENDC combo config from CSV test config 243 244 Args: 245 test_config: dict containing ENDC combo config from CSV 246 Returns: 247 endc_combo_config: dictionary with all ENDC combo settings 248 """ 249 endc_combo_config = collections.OrderedDict() 250 cell_config_list = [] 251 252 lte_cell_count = 1 253 lte_carriers = [1] 254 lte_scc_list = [] 255 endc_combo_config['lte_pcc'] = 1 256 lte_cell = { 257 'cell_type': 'LTE', 258 'cell_number': 1, 259 'pcc': 1, 260 'band': self.testclass_params['lte_anchor_band'], 261 'dl_bandwidth': self.testclass_params['lte_anchor_bandwidth'], 262 'ul_enabled': 1, 263 'duplex_mode': self.testclass_params['lte_anchor_duplex_mode'], 264 'dl_mimo_config': 'D{nss}U{nss}'.format(nss=1), 265 'ul_mimo_config': 'D{nss}U{nss}'.format(nss=1), 266 'transmission_mode': 'TM1', 267 'num_codewords': 1, 268 'num_layers': 1, 269 'dl_subframe_allocation': [1] * 10, 270 } 271 cell_config_list.append(lte_cell) 272 273 nr_cell_count = 0 274 nr_dl_carriers = [] 275 nr_ul_carriers = [] 276 for nr_cell_idx in range(1, test_config['num_dl_cells'] + 1): 277 nr_cell = { 278 'cell_type': 279 'NR5G', 280 'cell_number': 281 nr_cell_idx, 282 'nr_cell_type': 283 'NSA', 284 'band': 285 test_config['nr_band'], 286 'duplex_mode': 287 test_config['nr_duplex_mode'], 288 'channel': 289 test_config['nr_channel'], 290 'dl_mimo_config': 291 'N{nss}X{nss}'.format(nss=test_config['nr_dl_mimo_config']), 292 'dl_bandwidth_class': 293 'A', 294 'dl_bandwidth': 295 test_config['nr_bandwidth'], 296 'ul_enabled': 297 1 if nr_cell_idx <= test_config['num_ul_cells'] else 0, 298 'ul_bandwidth_class': 299 'A', 300 'ul_mimo_config': 301 'N{nss}X{nss}'.format(nss=test_config['nr_ul_mimo_config']), 302 'subcarrier_spacing': 303 'MU3' 304 } 305 cell_config_list.append(nr_cell) 306 nr_cell_count = nr_cell_count + 1 307 nr_dl_carriers.append(nr_cell_idx) 308 if nr_cell_idx <= test_config['num_ul_cells']: 309 nr_ul_carriers.append(nr_cell_idx) 310 311 endc_combo_config['lte_cell_count'] = lte_cell_count 312 endc_combo_config['nr_cell_count'] = nr_cell_count 313 endc_combo_config['nr_dl_carriers'] = nr_dl_carriers 314 endc_combo_config['nr_ul_carriers'] = nr_ul_carriers 315 endc_combo_config['cell_list'] = cell_config_list 316 endc_combo_config['lte_scc_list'] = lte_scc_list 317 endc_combo_config['lte_dl_carriers'] = lte_carriers 318 endc_combo_config['lte_ul_carriers'] = lte_carriers 319 return endc_combo_config 320 321 def generate_test_cases(self, band_list, channel_list, dl_mcs_list, 322 num_dl_cells_list, dl_mimo_config, 323 orientation_list, **kwargs): 324 """Function that auto-generates test cases for a test class.""" 325 test_cases = [] 326 for orientation, band, channel, num_dl_cells, nr_dl_mcs in itertools.product( 327 orientation_list, band_list, channel_list, num_dl_cells_list, 328 dl_mcs_list): 329 if channel not in cputils.PCC_PRESET_MAPPING[band]: 330 continue 331 test_config = { 332 'nr_band': band, 333 'nr_bandwidth': 'BW100', 334 'nr_duplex_mode': 'TDD', 335 'nr_channel': channel, 336 'num_dl_cells': num_dl_cells, 337 'num_ul_cells': 1, 338 'nr_dl_mimo_config': dl_mimo_config, 339 'nr_ul_mimo_config': 1 340 } 341 endc_combo_config = self.generate_endc_combo_config(test_config) 342 test_name = 'test_fr2_{}_{}_{}CC_mcs{}_{}x{}'.format( 343 band, channel.lower(), num_dl_cells, nr_dl_mcs, dl_mimo_config, 344 dl_mimo_config) 345 test_params = collections.OrderedDict( 346 endc_combo_config=endc_combo_config, 347 nr_dl_mcs=nr_dl_mcs, 348 orientation=orientation, 349 **kwargs) 350 setattr(self, test_name, 351 partial(self._test_throughput_bler, test_params)) 352 test_cases.append(test_name) 353 return test_cases 354