• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3.4
2#
3#   Copyright 2022 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections
18import csv
19import itertools
20import numpy
21import json
22import os
23from acts import context
24from acts import base_test
25from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
26from acts_contrib.test_utils.cellular.performance import cellular_performance_test_utils as cputils
27from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
28from acts_contrib.test_utils.wifi.wifi_performance_test_utils.bokeh_figure import BokehFigure
29from acts_contrib.test_utils.cellular.performance.CellularThroughputBaseTest import CellularThroughputBaseTest
30
31from functools import partial
32
33
34class CellularFr2SensitivityTest(CellularThroughputBaseTest):
35    """Class to test single cell FR1 NSA sensitivity"""
36
37    def __init__(self, controllers):
38        base_test.BaseTestClass.__init__(self, controllers)
39        self.testcase_metric_logger = (
40            BlackboxMappedMetricLogger.for_test_case())
41        self.testclass_metric_logger = (
42            BlackboxMappedMetricLogger.for_test_class())
43        self.publish_testcase_metrics = True
44        self.testclass_params = self.user_params['fr2_sensitivity_test_params']
45        self.log.info('Hello')
46        self.tests = self.generate_test_cases(
47            band_list=['N257', 'N258', 'N260', 'N261'],
48            channel_list=['low', 'mid', 'high'],
49            dl_mcs_list=list(numpy.arange(28, -1, -1)),
50            num_dl_cells_list=[1, 2, 4, 8],
51            orientation_list=['A_Plane', 'B_Plane'],
52            dl_mimo_config=2,
53            nr_ul_mcs=4,
54            lte_dl_mcs_table='QAM256',
55            lte_dl_mcs=4,
56            lte_ul_mcs_table='QAM256',
57            lte_ul_mcs=4,
58            schedule_scenario="FULL_TPUT",
59            schedule_slot_ratio=80,
60            force_contiguous_nr_channel=True,
61            transform_precoding=0,
62            nr_dl_mcs_table='Q256',
63            nr_ul_mcs_table='Q64')
64
65    def process_testclass_results(self):
66        # Plot individual test id results raw data and compile metrics
67        plots = collections.OrderedDict()
68        compiled_data = collections.OrderedDict()
69        for testcase_name, testcase_data in self.testclass_results.items():
70            nr_cell_index = testcase_data['testcase_params'][
71                'endc_combo_config']['lte_cell_count']
72            cell_config = testcase_data['testcase_params'][
73                'endc_combo_config']['cell_list'][nr_cell_index]
74            test_id = tuple(('band', cell_config['band']))
75            if test_id not in plots:
76                # Initialize test id data when not present
77                compiled_data[test_id] = {
78                    'mcs': [],
79                    'average_throughput': [],
80                    'theoretical_throughput': [],
81                    'cell_power': [],
82                }
83                plots[test_id] = BokehFigure(
84                    title='Band {} - BLER Curves'.format(cell_config['band']),
85                    x_label='Cell Power (dBm)',
86                    primary_y_label='BLER (Mbps)')
87                test_id_rvr = test_id + tuple('RvR')
88                plots[test_id_rvr] = BokehFigure(
89                    title='Band {} - RvR'.format(cell_config['band']),
90                    x_label='Cell Power (dBm)',
91                    primary_y_label='PHY Rate (Mbps)')
92            # Compile test id data and metrics
93            compiled_data[test_id]['average_throughput'].append(
94                testcase_data['average_throughput_list'])
95            compiled_data[test_id]['cell_power'].append(
96                testcase_data['cell_power_list'])
97            compiled_data[test_id]['mcs'].append(
98                testcase_data['testcase_params']['nr_dl_mcs'])
99            # Add test id to plots
100            plots[test_id].add_line(
101                testcase_data['cell_power_list'],
102                testcase_data['bler_list'],
103                'MCS {}'.format(testcase_data['testcase_params']['nr_dl_mcs']),
104                width=1)
105            plots[test_id_rvr].add_line(
106                testcase_data['cell_power_list'],
107                testcase_data['average_throughput_list'],
108                'MCS {}'.format(testcase_data['testcase_params']['nr_dl_mcs']),
109                width=1,
110                style='dashed')
111
112        for test_id, test_data in compiled_data.items():
113            test_id_rvr = test_id + tuple('RvR')
114            cell_power_interp = sorted(set(sum(test_data['cell_power'], [])))
115            average_throughput_interp = []
116            for mcs, cell_power, throughput in zip(
117                    test_data['mcs'], test_data['cell_power'],
118                    test_data['average_throughput']):
119                throughput_interp = numpy.interp(cell_power_interp,
120                                                 cell_power[::-1],
121                                                 throughput[::-1])
122                average_throughput_interp.append(throughput_interp)
123            rvr = numpy.max(average_throughput_interp, 0)
124            plots[test_id_rvr].add_line(cell_power_interp, rvr,
125                                        'Rate vs. Range')
126
127        figure_list = []
128        for plot_id, plot in plots.items():
129            plot.generate_figure()
130            figure_list.append(plot)
131        output_file_path = os.path.join(self.log_path, 'results.html')
132        BokehFigure.save_figures(figure_list, output_file_path)
133        """Saves CSV with all test results to enable comparison."""
134        results_file_path = os.path.join(
135            context.get_current_context().get_full_output_path(),
136            'results.csv')
137        with open(results_file_path, 'w', newline='') as csvfile:
138            field_names = ['Test Name', 'Sensitivity']
139            writer = csv.DictWriter(csvfile, fieldnames=field_names)
140            writer.writeheader()
141
142            for testcase_name, testcase_results in self.testclass_results.items(
143            ):
144                row_dict = {
145                    'Test Name': testcase_name,
146                    'Sensitivity': testcase_results['sensitivity']
147                }
148                writer.writerow(row_dict)
149
150    def process_testcase_results(self):
151        if self.current_test_name not in self.testclass_results:
152            return
153        testcase_data = self.testclass_results[self.current_test_name]
154
155        bler_list = []
156        average_throughput_list = []
157        theoretical_throughput_list = []
158        nr_cell_index = testcase_data['testcase_params']['endc_combo_config'][
159            'lte_cell_count']
160        cell_power_list = testcase_data['testcase_params']['cell_power_sweep'][
161            nr_cell_index]
162        for result in testcase_data['results']:
163            bler_list.append(result['throughput_measurements']
164                             ['nr_bler_result']['total']['DL']['nack_ratio'])
165            average_throughput_list.append(
166                result['throughput_measurements']['nr_tput_result']['total']
167                ['DL']['average_tput'])
168            theoretical_throughput_list.append(
169                result['throughput_measurements']['nr_tput_result']['total']
170                ['DL']['theoretical_tput'])
171        padding_len = len(cell_power_list) - len(average_throughput_list)
172        average_throughput_list.extend([0] * padding_len)
173        theoretical_throughput_list.extend([0] * padding_len)
174
175        bler_above_threshold = [
176            bler > self.testclass_params['bler_threshold']
177            for bler in bler_list
178        ]
179        for idx in range(len(bler_above_threshold)):
180            if all(bler_above_threshold[idx:]):
181                sensitivity_idx = max(idx, 1) - 1
182                break
183        else:
184            sensitivity_idx = -1
185        sensitivity = cell_power_list[sensitivity_idx]
186        self.log.info('NR Band {} MCS {} Sensitivity = {}dBm'.format(
187            testcase_data['testcase_params']['endc_combo_config']['cell_list']
188            [nr_cell_index]['band'],
189            testcase_data['testcase_params']['nr_dl_mcs'], sensitivity))
190
191        testcase_data['bler_list'] = bler_list
192        testcase_data['average_throughput_list'] = average_throughput_list
193        testcase_data[
194            'theoretical_throughput_list'] = theoretical_throughput_list
195        testcase_data['cell_power_list'] = cell_power_list
196        testcase_data['sensitivity'] = sensitivity
197
198        results_file_path = os.path.join(
199            context.get_current_context().get_full_output_path(),
200            '{}.json'.format(self.current_test_name))
201        with open(results_file_path, 'w') as results_file:
202            json.dump(wputils.serialize_dict(testcase_data),
203                      results_file,
204                      indent=4)
205
206    def get_per_cell_power_sweeps(self, testcase_params):
207        # get reference test
208        current_band = testcase_params['endc_combo_config']['cell_list'][1][
209            'band']
210        reference_test = None
211        reference_sensitivity = None
212        for testcase_name, testcase_data in self.testclass_results.items():
213            if testcase_data['testcase_params']['endc_combo_config'][
214                    'cell_list'][1]['band'] == current_band:
215                reference_test = testcase_name
216                reference_sensitivity = testcase_data['sensitivity']
217        if reference_test and reference_sensitivity and not self.retry_flag:
218            start_atten = reference_sensitivity + self.testclass_params[
219                'adjacent_mcs_gap']
220            self.log.info(
221                "Reference test {} found. Sensitivity {} dBm. Starting at {} dBm"
222                .format(reference_test, reference_sensitivity, start_atten))
223        else:
224            start_atten = self.testclass_params['nr_cell_power_start']
225            self.log.info(
226                "Reference test not found. Starting at {} dBm".format(
227                    start_atten))
228        # get current cell power start
229        nr_cell_sweep = list(
230            numpy.arange(start_atten,
231                         self.testclass_params['nr_cell_power_stop'],
232                         self.testclass_params['nr_cell_power_step']))
233        lte_sweep = [self.testclass_params['lte_cell_power']
234                     ] * len(nr_cell_sweep)
235        cell_power_sweeps = [lte_sweep]
236        cell_power_sweeps.extend(
237            [nr_cell_sweep] *
238            testcase_params['endc_combo_config']['nr_cell_count'])
239        return cell_power_sweeps
240
241    def generate_endc_combo_config(self, test_config):
242        """Function to generate ENDC combo config from CSV test config
243
244        Args:
245            test_config: dict containing ENDC combo config from CSV
246        Returns:
247            endc_combo_config: dictionary with all ENDC combo settings
248        """
249        endc_combo_config = collections.OrderedDict()
250        cell_config_list = []
251
252        lte_cell_count = 1
253        lte_carriers = [1]
254        lte_scc_list = []
255        endc_combo_config['lte_pcc'] = 1
256        lte_cell = {
257            'cell_type': 'LTE',
258            'cell_number': 1,
259            'pcc': 1,
260            'band': self.testclass_params['lte_anchor_band'],
261            'dl_bandwidth': self.testclass_params['lte_anchor_bandwidth'],
262            'ul_enabled': 1,
263            'duplex_mode': self.testclass_params['lte_anchor_duplex_mode'],
264            'dl_mimo_config': 'D{nss}U{nss}'.format(nss=1),
265            'ul_mimo_config': 'D{nss}U{nss}'.format(nss=1),
266            'transmission_mode': 'TM1',
267            'num_codewords': 1,
268            'num_layers': 1,
269            'dl_subframe_allocation': [1] * 10,
270        }
271        cell_config_list.append(lte_cell)
272
273        nr_cell_count = 0
274        nr_dl_carriers = []
275        nr_ul_carriers = []
276        for nr_cell_idx in range(1, test_config['num_dl_cells'] + 1):
277            nr_cell = {
278                'cell_type':
279                'NR5G',
280                'cell_number':
281                nr_cell_idx,
282                'nr_cell_type':
283                'NSA',
284                'band':
285                test_config['nr_band'],
286                'duplex_mode':
287                test_config['nr_duplex_mode'],
288                'channel':
289                test_config['nr_channel'],
290                'dl_mimo_config':
291                'N{nss}X{nss}'.format(nss=test_config['nr_dl_mimo_config']),
292                'dl_bandwidth_class':
293                'A',
294                'dl_bandwidth':
295                test_config['nr_bandwidth'],
296                'ul_enabled':
297                1 if nr_cell_idx <= test_config['num_ul_cells'] else 0,
298                'ul_bandwidth_class':
299                'A',
300                'ul_mimo_config':
301                'N{nss}X{nss}'.format(nss=test_config['nr_ul_mimo_config']),
302                'subcarrier_spacing':
303                'MU3'
304            }
305            cell_config_list.append(nr_cell)
306            nr_cell_count = nr_cell_count + 1
307            nr_dl_carriers.append(nr_cell_idx)
308            if nr_cell_idx <= test_config['num_ul_cells']:
309                nr_ul_carriers.append(nr_cell_idx)
310
311        endc_combo_config['lte_cell_count'] = lte_cell_count
312        endc_combo_config['nr_cell_count'] = nr_cell_count
313        endc_combo_config['nr_dl_carriers'] = nr_dl_carriers
314        endc_combo_config['nr_ul_carriers'] = nr_ul_carriers
315        endc_combo_config['cell_list'] = cell_config_list
316        endc_combo_config['lte_scc_list'] = lte_scc_list
317        endc_combo_config['lte_dl_carriers'] = lte_carriers
318        endc_combo_config['lte_ul_carriers'] = lte_carriers
319        return endc_combo_config
320
321    def generate_test_cases(self, band_list, channel_list, dl_mcs_list,
322                            num_dl_cells_list, dl_mimo_config,
323                            orientation_list, **kwargs):
324        """Function that auto-generates test cases for a test class."""
325        test_cases = []
326        for orientation, band, channel, num_dl_cells, nr_dl_mcs in itertools.product(
327                orientation_list, band_list, channel_list, num_dl_cells_list,
328                dl_mcs_list):
329            if channel not in cputils.PCC_PRESET_MAPPING[band]:
330                continue
331            test_config = {
332                'nr_band': band,
333                'nr_bandwidth': 'BW100',
334                'nr_duplex_mode': 'TDD',
335                'nr_channel': channel,
336                'num_dl_cells': num_dl_cells,
337                'num_ul_cells': 1,
338                'nr_dl_mimo_config': dl_mimo_config,
339                'nr_ul_mimo_config': 1
340            }
341            endc_combo_config = self.generate_endc_combo_config(test_config)
342            test_name = 'test_fr2_{}_{}_{}CC_mcs{}_{}x{}'.format(
343                band, channel.lower(), num_dl_cells, nr_dl_mcs, dl_mimo_config,
344                dl_mimo_config)
345            test_params = collections.OrderedDict(
346                endc_combo_config=endc_combo_config,
347                nr_dl_mcs=nr_dl_mcs,
348                orientation=orientation,
349                **kwargs)
350            setattr(self, test_name,
351                    partial(self._test_throughput_bler, test_params))
352            test_cases.append(test_name)
353        return test_cases
354