• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3.4
2#
3#   Copyright 2022 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections
18import csv
19import itertools
20import json
21import re
22
23import numpy
24import os
25import time
26from acts import asserts
27from acts import context
28from acts import base_test
29from acts import utils
30from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
31from acts.controllers.utils_lib import ssh
32from acts.controllers import iperf_server as ipf
33from acts.controllers import power_monitor as power_monitor_lib
34from acts_contrib.test_utils.cellular.keysight_5g_testapp import Keysight5GTestApp
35from acts_contrib.test_utils.cellular.keysight_chamber import KeysightChamber
36from acts_contrib.test_utils.cellular.performance import cellular_performance_test_utils as cputils
37from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
38from acts_contrib.test_utils.power import plot_utils as power_plot_utils
39
40LONG_SLEEP = 10
41MEDIUM_SLEEP = 2
42IPERF_TIMEOUT = 10
43SHORT_SLEEP = 1
44VERY_SHORT_SLEEP = 0.1
45SUBFRAME_LENGTH = 0.001
46STOP_COUNTER_LIMIT = 3
47RESET_BATTERY_STATS = 'dumpsys batterystats --reset'
48DEFAULT_MONSOON_FREQUENCY = 500
49PHONE_BATTERY_VOLTAGE_DEFAULT = 4
50
51from functools import wraps
52import logging
53
54def suspend_logging(func):
55
56    @wraps(func)
57    def inner(*args, **kwargs):
58        logging.disable(logging.FATAL)
59        try:
60            return func(*args, **kwargs)
61        finally:
62            logging.disable(logging.NOTSET)
63
64    return inner
65
66
67class CellularThroughputBaseTest(base_test.BaseTestClass):
68    """Base class for Cellular Throughput Testing
69
70    This base class enables cellular throughput tests on a lab/callbox setup
71    with PHY layer or iperf traffic.
72    """
73
74    def __init__(self, controllers):
75        base_test.BaseTestClass.__init__(self, controllers)
76        self.testcase_metric_logger = (
77            BlackboxMappedMetricLogger.for_test_case())
78        self.testclass_metric_logger = (
79            BlackboxMappedMetricLogger.for_test_class())
80        self.publish_testcase_metrics = True
81        self.testclass_params = {}
82
83    def setup_class(self):
84        """Initializes common test hardware and parameters.
85
86        This function initializes hardwares and compiles parameters that are
87        common to all tests in this class.
88        """
89        # Setup controllers
90        self.dut = self.android_devices[-1]
91        self.dut_utils = cputils.DeviceUtils(self.dut, self.log)
92        self.keysight_test_app = Keysight5GTestApp(
93            self.user_params['Keysight5GTestApp'])
94        if 'KeysightChamber' in self.user_params:
95            self.keysight_chamber = KeysightChamber(
96                self.user_params['KeysightChamber'])
97        self.remote_server = ssh.connection.SshConnection(
98            ssh.settings.from_config(
99                self.user_params['RemoteServer']['ssh_config']))
100
101        self.unpack_userparams(MonsoonParams=None,
102                               bits_root_rail_csv_export=False)
103        self.power_monitor = self.initialize_power_monitor()
104
105        # Configure Tester
106        if self.testclass_params.get('reload_scpi', 0):
107            self.keysight_test_app.import_scpi_file(
108                self.testclass_params['scpi_file'])
109
110        # Declare testclass variables
111        self.testclass_results = collections.OrderedDict()
112
113        # Configure test retries
114        self.user_params['retry_tests'] = [self.__class__.__name__]
115
116        # Turn Airplane mode on
117        self.dut_utils.toggle_airplane_mode(True, False)
118
119    def teardown_class(self):
120        if self.power_monitor:
121            self.power_monitor.connect_usb()
122            self.dut.wait_for_boot_completion()
123            self.dut_utils.start_services()
124        self.log.info('Turning airplane mode on')
125        try:
126            self.dut_utils.toggle_airplane_mode(True, False)
127        except:
128            self.log.warning('Cannot perform teardown operations on DUT.')
129        try:
130            self.keysight_test_app.turn_all_cells_off()
131            self.keysight_test_app.destroy()
132        except:
133            self.log.warning('Cannot perform teardown operations on tester.')
134        self.process_testclass_results()
135
136    def setup_test(self):
137        self.retry_flag = False
138        if self.testclass_params.get('enable_pixel_logs', 0):
139            self.dut_utils.start_pixel_logger()
140
141    def teardown_test(self):
142        self.process_testcase_results()
143        if self.power_monitor:
144            self.log.info('Reconnecting USB and waiting for boot completion.')
145            self.power_monitor.connect_usb()
146            self.dut.wait_for_boot_completion()
147            self.dut_utils.start_services()
148        self.retry_flag = False
149        self.log.info('Turing airplane mode on')
150        self.dut_utils.toggle_airplane_mode(True, False)
151        self.log.info('Turning all cells off.')
152        self.keysight_test_app.turn_all_cells_off()
153        log_path = os.path.join(
154            context.get_current_context().get_full_output_path(), 'pixel_logs')
155        os.makedirs(self.log_path, exist_ok=True)
156        if self.testclass_params.get('enable_pixel_logs', 0):
157            self.dut_utils.stop_pixel_logger(log_path)
158        self.pass_fail_check()
159
160    def on_retry(self):
161        """Function to control test logic on retried tests.
162
163        This function is automatically executed on tests that are being
164        retried. In this case the function resets wifi, toggles it off and on
165        and sets a retry_flag to enable further tweaking the test logic on
166        second attempts.
167        """
168        self.dut_utils.toggle_airplane_mode(True, False)
169        if self.keysight_test_app.get_cell_state('LTE', 'CELL1'):
170            self.log.info('Turning LTE off.')
171            self.keysight_test_app.set_cell_state('LTE', 'CELL1', 0)
172        self.retry_flag = True
173
174    def initialize_power_monitor(self):
175        """ Initializes the power monitor object.
176
177        Raises an exception if there are no controllers available.
178        """
179
180        device_time = self.dut.adb.shell('echo $EPOCHREALTIME')
181        host_time = time.time()
182        self.log.debug('device start time %s, host start time %s', device_time,
183                       host_time)
184        self.device_to_host_offset = float(device_time) - host_time
185        self.power_monitor_config = {}
186        if hasattr(self, 'bitses') and self.testclass_params.get('measure_power', 1):
187            power_monitor = self.bitses[0]
188            power_monitor.setup(registry=self.user_params)
189            self.power_monitor_config = {'voltage': self.user_params['Bits'][0]['Monsoon']['monsoon_voltage'],
190                                         'frequency': 976.5925,
191                                         'measurement_type': 'power'}
192        elif hasattr(self, 'monsoons') and self.testclass_params.get('measure_power', 1):
193            power_monitor = power_monitor_lib.PowerMonitorMonsoonFacade(
194                self.monsoons[0])
195            self.monsoons[0].set_max_current(self.MonsoonParams['current'])
196            self.monsoons[0].set_voltage(self.MonsoonParams['voltage'])
197            self.power_monitor_config = {'voltage': self.MonsoonParams['voltage'],
198                                         'frequency': self.MonsoonParams['frequency'],
199                                         'measurement_type': 'current'}
200        else:
201            power_monitor = None
202        return power_monitor
203
204    def pass_fail_check(self):
205        pass
206
207    def process_testcase_results(self):
208        pass
209
210    def process_testclass_results(self):
211        pass
212
213    def get_per_cell_power_sweeps(self, testcase_params):
214        raise NotImplementedError(
215            'get_per_cell_power_sweeps must be implemented.')
216
217    def compile_test_params(self, testcase_params):
218        """Function that completes all test params based on the test name.
219
220        Args:
221            testcase_params: dict containing test-specific parameters
222        """
223        # Measurement Duration
224        testcase_params['bler_measurement_length'] = int(
225            self.testclass_params['traffic_duration'] / SUBFRAME_LENGTH)
226        # Cell power sweep
227        # TODO: Make this a function to support single power and sweep modes for each cell
228        testcase_params['cell_power_sweep'] = self.get_per_cell_power_sweeps(
229            testcase_params)
230        # Traffic & iperf params
231        if self.testclass_params['traffic_type'] == 'PHY':
232            return testcase_params
233        if self.testclass_params['traffic_type'] == 'TCP':
234            testcase_params['iperf_socket_size'] = self.testclass_params.get(
235                'tcp_socket_size', None)
236            testcase_params['iperf_processes'] = self.testclass_params.get(
237                'tcp_processes', 1)
238        elif self.testclass_params['traffic_type'] == 'UDP':
239            testcase_params['iperf_socket_size'] = self.testclass_params.get(
240                'udp_socket_size', None)
241            testcase_params['iperf_processes'] = self.testclass_params.get(
242                'udp_processes', 1)
243        adb_iperf_server = isinstance(self.iperf_servers[0],
244                                      ipf.IPerfServerOverAdb)
245        if testcase_params['traffic_direction'] == 'DL':
246            reverse_direction = 0 if adb_iperf_server else 1
247            testcase_params[
248                'use_client_output'] = False if adb_iperf_server else True
249        elif testcase_params['traffic_direction'] == 'UL':
250            reverse_direction = 1 if adb_iperf_server else 0
251            testcase_params[
252                'use_client_output'] = True if adb_iperf_server else False
253        testcase_params['iperf_args'] = wputils.get_iperf_arg_string(
254            duration=self.testclass_params['traffic_duration'],
255            reverse_direction=reverse_direction,
256            traffic_type=self.testclass_params['traffic_type'],
257            socket_size=testcase_params['iperf_socket_size'],
258            num_processes=testcase_params['iperf_processes'],
259            udp_throughput=self.testclass_params['UDP_rates'].get(
260                testcase_params['num_dl_cells'],
261                self.testclass_params['UDP_rates']["default"]),
262            udp_length=1440)
263        return testcase_params
264
265    def run_iperf_traffic(self, testcase_params):
266        self.iperf_servers[0].start(tag=0)
267        dut_ip = self.dut.droid.connectivityGetIPv4Addresses('rmnet0')[0]
268        if 'iperf_server_address' in self.testclass_params:
269            iperf_server_address = self.testclass_params[
270                'iperf_server_address']
271        elif isinstance(self.iperf_servers[0], ipf.IPerfServerOverAdb):
272            iperf_server_address = dut_ip
273        else:
274            iperf_server_address = wputils.get_server_address(
275                self.remote_server, dut_ip, '255.255.255.0')
276        client_output_path = self.iperf_clients[0].start(
277            iperf_server_address, testcase_params['iperf_args'], 0,
278            self.testclass_params['traffic_duration'] + IPERF_TIMEOUT)
279        server_output_path = self.iperf_servers[0].stop()
280        # Parse and log result
281        if testcase_params['use_client_output']:
282            iperf_file = client_output_path
283        else:
284            iperf_file = server_output_path
285        try:
286            iperf_result = ipf.IPerfResult(iperf_file)
287            current_throughput = numpy.mean(iperf_result.instantaneous_rates[
288                self.testclass_params['iperf_ignored_interval']:-1]) * 8 * (
289                    1.024**2)
290        except:
291            self.log.warning(
292                'ValueError: Cannot get iperf result. Setting to 0')
293            current_throughput = 0
294        return current_throughput
295
296    def start_single_throughput_measurement(self, testcase_params):
297        self.log.info('Starting BLER & throughput tests.')
298        if testcase_params['endc_combo_config']['nr_cell_count']:
299            self.keysight_test_app.start_bler_measurement(
300                'NR5G', testcase_params['endc_combo_config']['nr_dl_carriers'],
301                testcase_params['bler_measurement_length'])
302        if testcase_params['endc_combo_config']['lte_cell_count']:
303            self.keysight_test_app.start_bler_measurement(
304                'LTE',
305                testcase_params['endc_combo_config']['lte_dl_carriers'][0],
306                testcase_params['bler_measurement_length'])
307        if self.testclass_params['traffic_type'] != 'PHY':
308            #TODO: get iperf to run in non-blocking mode
309            self.log.warning(
310                'iperf traffic not currently supported with power measurement')
311
312    def stop_single_throughput_measurement(self, testcase_params):
313        result = collections.OrderedDict()
314        if testcase_params['endc_combo_config']['nr_cell_count']:
315            result['nr_bler_result'] = self.keysight_test_app.get_bler_result(
316                'NR5G', testcase_params['endc_combo_config']['nr_dl_carriers'],
317                testcase_params['endc_combo_config']['nr_ul_carriers'],
318                testcase_params['bler_measurement_length'])
319            result['nr_tput_result'] = self.keysight_test_app.get_throughput(
320                'NR5G', testcase_params['endc_combo_config']['nr_dl_carriers'],
321                testcase_params['endc_combo_config']['nr_ul_carriers'])
322        if testcase_params['endc_combo_config']['lte_cell_count']:
323            result['lte_bler_result'] = self.keysight_test_app.get_bler_result(
324                cell_type='LTE',
325                dl_cells=testcase_params['endc_combo_config']
326                ['lte_dl_carriers'],
327                ul_cells=testcase_params['endc_combo_config']
328                ['lte_ul_carriers'],
329                length=testcase_params['bler_measurement_length'])
330            result['lte_tput_result'] = self.keysight_test_app.get_throughput(
331                'LTE', testcase_params['endc_combo_config']['lte_dl_carriers'],
332                testcase_params['endc_combo_config']['lte_ul_carriers'])
333        return result
334
335    def run_single_throughput_measurement(self, testcase_params):
336        self.start_single_throughput_measurement(testcase_params)
337        result = self.stop_single_throughput_measurement(testcase_params)
338        return result
339
340    #@suspend_logging
341    def meausre_power_silently(self, measurement_time, measurement_wait,
342                               data_path, measurement_tag):
343        measurement_name = '{}_{}'.format(self.test_name, measurement_tag)
344        measurement_args = dict(duration=measurement_time,
345                                measure_after_seconds=measurement_wait,
346                                hz=self.power_monitor_config['frequency'])
347
348        self.power_monitor.measure(measurement_args=measurement_args,
349                                   measurement_name=measurement_name,
350                                   start_time=self.device_to_host_offset,
351                                   monsoon_output_path=data_path)
352
353    def collect_power_data(self,
354                           measurement_time,
355                           measurement_wait,
356                           reconnect_usb=0,
357                           measurement_tag=0):
358        """Measure power, plot and take log if needed.
359
360        Returns:
361            A MonsoonResult object.
362            measurement_time: length of power measurement
363            measurement_wait: wait before measurement(within monsoon controller)
364            measurement_tag: tag to append to file names
365        """
366        if self.dut.is_connected():
367            self.dut_utils.go_to_sleep()
368            self.dut_utils.stop_services()
369            time.sleep(SHORT_SLEEP)
370            self.dut_utils.log_odpm(
371                os.path.join(
372                    context.get_current_context().get_full_output_path(),
373                    '{}_odpm_{}_{}.txt'.format(self.test_name, measurement_tag, 'start')))
374            self.power_monitor.disconnect_usb()
375        else:
376            self.log.info('DUT already disconnected. Skipping USB operations.')
377
378        self.log.info('Starting power measurement. Duration: {}s. Offset: '
379                      '{}s. Voltage: {} V.'.format(
380                          measurement_time, measurement_wait,
381                          self.power_monitor_config['voltage']))
382        # Collecting current measurement data and plot
383        tag = '{}_{}'.format(self.test_name, measurement_tag)
384        data_path = os.path.join(
385            context.get_current_context().get_full_output_path(),
386            '{}.txt'.format(tag))
387        self.meausre_power_silently(measurement_time, measurement_wait,
388                                    data_path, measurement_tag)
389        self.power_monitor.release_resources()
390        if hasattr(self, 'bitses') and self.bits_root_rail_csv_export:
391            path = os.path.join(
392                context.get_current_context().get_full_output_path(), 'Kibble')
393            os.makedirs(path, exist_ok=True)
394            self.power_monitor.get_bits_root_rail_csv_export(
395                path, '{}_{}'.format(self.test_name, measurement_tag))
396        samples = self.power_monitor.get_waveform(file_path=data_path)
397
398        if reconnect_usb:
399            self.log.info('Reconnecting USB.')
400            self.power_monitor.connect_usb()
401            self.dut.wait_for_boot_completion()
402            time.sleep(LONG_SLEEP)
403            # Save ODPM if applicable
404            self.dut_utils.log_odpm(
405                os.path.join(
406                    context.get_current_context().get_full_output_path(),
407                    '{}_odpm_{}_{}.txt'.format(self.test_name, measurement_tag, 'end')))
408            # Restart Sl4a and other services
409            self.dut_utils.start_services()
410
411        measurement_samples = [sample[1] for sample in samples]
412        average_measurement = sum(measurement_samples) * 1000 / len(measurement_samples)
413        if self.power_monitor_config['measurement_type'] == 'current':
414            average_power = average_measurement * self.power_monitor_config['voltage']
415        else:
416            average_power = average_measurement
417        self.log.info('Average power : {}'.format(average_power))
418        plot_title = '{}_{}'.format(self.test_name, measurement_tag)
419        power_plot_utils.current_waveform_plot(
420            samples, self.power_monitor_config['voltage'],
421            context.get_current_context().get_full_output_path(), plot_title)
422
423        return average_power
424
425    @wputils.nonblocking
426    def collect_power_data_nonblocking(self,
427                           measurement_time,
428                           measurement_wait,
429                           reconnect_usb=0,
430                           measurement_tag=0):
431        return self.collect_power_data(
432            measurement_time, measurement_wait, reconnect_usb, measurement_tag)
433
434    def print_throughput_result(self, result):
435        # Print Test Summary
436        if 'nr_tput_result' in result:
437
438            self.log.info(
439                "NR5G DL PHY Tput (Mbps) (Min/Avg/Max/Th): {:.2f} / {:.2f} / {:.2f} / {:.2f}\tBLER: {:.2f}"
440                .format(
441                    result['nr_tput_result']['total']['DL']['min_tput'],
442                    result['nr_tput_result']['total']['DL']['average_tput'],
443                    result['nr_tput_result']['total']['DL']['max_tput'],
444                    result['nr_tput_result']['total']['DL']
445                    ['theoretical_tput'],
446                    result['nr_bler_result']['total']['DL']['nack_ratio'] *
447                    100))
448            self.log.info(
449                "NR5G UL PHY Tput (Mbps) (Min/Avg/Max/Th): {:.2f} / {:.2f} / {:.2f} / {:.2f}\tBLER: {:.2f}"
450                .format(
451                    result['nr_tput_result']['total']['UL']['min_tput'],
452                    result['nr_tput_result']['total']['UL']['average_tput'],
453                    result['nr_tput_result']['total']['UL']['max_tput'],
454                    result['nr_tput_result']['total']['UL']
455                    ['theoretical_tput'],
456                    result['nr_bler_result']['total']['UL']['nack_ratio'] *
457                    100))
458        if 'lte_tput_result' in result:
459            self.log.info(
460                "LTE DL PHY Tput (Mbps) (Min/Avg/Max/Th): {:.2f} / {:.2f} / {:.2f} / {:.2f}\tBLER: {:.2f}"
461                .format(
462                    result['lte_tput_result']['total']['DL']['min_tput'],
463                    result['lte_tput_result']['total']['DL']['average_tput'],
464                    result['lte_tput_result']['total']['DL']['max_tput'],
465                    result['lte_tput_result']['total']['DL']
466                    ['theoretical_tput'],
467                    result['lte_bler_result']['total']['DL']['nack_ratio'] *
468                    100))
469            if self.testclass_params['lte_ul_mac_padding']:
470                self.log.info(
471                    "LTE UL PHY Tput (Mbps) (Min/Avg/Max/Th): {:.2f} / {:.2f} / {:.2f} / {:.2f}\tBLER: {:.2f}"
472                    .format(
473                        result['lte_tput_result']['total']['UL']['min_tput'],
474                        result['lte_tput_result']['total']['UL']
475                        ['average_tput'],
476                        result['lte_tput_result']['total']['UL']['max_tput'],
477                        result['lte_tput_result']['total']['UL']
478                        ['theoretical_tput'],
479                        result['lte_bler_result']['total']['UL']['nack_ratio']
480                        * 100))
481            if self.testclass_params['traffic_type'] != 'PHY':
482                self.log.info("{} Tput: {:.2f} Mbps".format(
483                    self.testclass_params['traffic_type'],
484                    result['iperf_throughput']))
485
486    def setup_tester(self, testcase_params):
487
488        # Configure all cells
489        self.keysight_test_app.toggle_contiguous_nr_channels(0)
490        for cell_idx, cell in enumerate(
491                testcase_params['endc_combo_config']['cell_list']):
492            self.keysight_test_app.enable_awgn_noise(cell['cell_type'], cell['cell_number'], 0)
493            self.keysight_test_app.set_channel_emulator_state(0)
494
495            if cell['cell_type'] == 'NR5G':
496                self.keysight_test_app.set_nr_cell_type(
497                    cell['cell_type'], cell['cell_number'],
498                    cell['nr_cell_type'])
499            self.keysight_test_app.set_cell_duplex_mode(
500                cell['cell_type'], cell['cell_number'], cell['duplex_mode'])
501            self.keysight_test_app.set_cell_band(cell['cell_type'],
502                                                 cell['cell_number'],
503                                                 cell['band'])
504            self.keysight_test_app.set_cell_dl_power(
505                cell['cell_type'], cell['cell_number'],
506                testcase_params['cell_power_sweep'][cell_idx][0], 0)
507            self.keysight_test_app.set_cell_input_power(
508                cell['cell_type'], cell['cell_number'],
509                self.testclass_params['input_power'][cell['cell_type']])
510            if cell['cell_type'] == 'LTE' and cell['pcc'] == 0:
511                pass
512            else:
513                self.keysight_test_app.set_cell_ul_power_control(
514                    cell['cell_type'], cell['cell_number'],
515                    self.testclass_params['ul_power_control_mode'],
516                    self.testclass_params.get('ul_power_control_target', 0))
517            if cell['cell_type'] == 'NR5G':
518                self.keysight_test_app.set_nr_subcarrier_spacing(
519                    cell['cell_number'], cell['subcarrier_spacing'])
520            if 'channel' in cell and cell['channel'] is not None:
521                self.keysight_test_app.set_cell_channel(
522                    cell['cell_type'], cell['cell_number'], cell['channel'])
523            self.keysight_test_app.set_cell_bandwidth(cell['cell_type'],
524                                                      cell['cell_number'],
525                                                      cell['dl_bandwidth'])
526            self.keysight_test_app.set_cell_mimo_config(
527                cell['cell_type'], cell['cell_number'], 'DL',
528                cell['dl_mimo_config'])
529            if cell['cell_type'] == 'LTE':
530                self.keysight_test_app.set_lte_cell_transmission_mode(
531                    cell['cell_number'], cell['transmission_mode'])
532                self.keysight_test_app.set_lte_cell_num_codewords(
533                    cell['cell_number'], cell['num_codewords'])
534                self.keysight_test_app.set_lte_cell_num_layers(
535                    cell['cell_number'], cell['num_layers'])
536
537                # self.keysight_test_app.set_lte_cell_dl_subframe_allocation(
538                #     cell['cell_number'], cell['dl_subframe_allocation'])
539                # self.keysight_test_app.set_lte_cell_tdd_frame_config(
540                #     cell['cell_number'], cell['tdd_frame_config'], cell['tdd_ssf_config'])
541                # self.keysight_test_app.set_lte_control_region_size(
542                #     cell['cell_number'], 1)
543                # self.keysight_test_app.set_lte_cell_mcs(
544                #     cell['cell_number'], testcase_params['lte_dl_mcs_table'],
545                #     testcase_params['lte_dl_mcs'],
546                #     testcase_params['lte_ul_mcs_table'],
547                #     testcase_params['lte_ul_mcs'])
548                # self.keysight_test_app.set_lte_ul_mac_padding(
549                #     self.testclass_params['lte_ul_mac_padding'])
550            if cell['ul_enabled'] and cell['cell_type'] == 'NR5G':
551                self.keysight_test_app.set_cell_mimo_config(
552                    cell['cell_type'], cell['cell_number'], 'UL',
553                    cell['ul_mimo_config'])
554            if 'fading_scenario' in self.testclass_params:
555                self.keysight_test_app.configure_channel_emulator(
556                    cell['cell_type'], cell['cell_number'],
557                    self.testclass_params['fading_scenario'][
558                        cell['cell_type']])
559
560        if testcase_params.get('force_contiguous_nr_channel', False):
561            self.keysight_test_app.toggle_contiguous_nr_channels(1)
562
563        if testcase_params['endc_combo_config']['nr_cell_count']:
564            #if 'schedule_scenario' in testcase_params:
565            #     self.keysight_test_app.set_nr_cell_schedule_scenario(
566            #         'CELL1', testcase_params['schedule_scenario'])
567            #     if testcase_params['schedule_scenario'] == 'FULL_TPUT':
568            #         self.keysight_test_app.set_nr_schedule_slot_ratio(
569            #             'CELL1', testcase_params['schedule_slot_ratio'])
570            #         self.keysight_test_app.set_nr_schedule_tdd_pattern(
571            #             'CELL1', testcase_params.get('tdd_pattern', 0))
572            # self.keysight_test_app.set_nr_ul_dft_precoding(
573            #     'CELL1', testcase_params['transform_precoding'])
574            # self.keysight_test_app.set_nr_cell_mcs(
575            #     'CELL1', testcase_params['nr_dl_mcs_table'], testcase_params['nr_dl_mcs'],
576            #     testcase_params['nr_ul_mcs_table'],
577            #     testcase_params['nr_ul_mcs'])
578            self.keysight_test_app.set_dl_carriers(
579                testcase_params['endc_combo_config']['nr_dl_carriers'])
580            self.keysight_test_app.set_ul_carriers(
581                testcase_params['endc_combo_config']['nr_ul_carriers'])
582
583        if testcase_params['endc_combo_config']['lte_cell_count']:
584            # Connect flow for LTE and LTE+FR1 ENDC
585            # Turn on LTE cells
586            for cell in testcase_params['endc_combo_config']['cell_list']:
587                if cell['cell_type'] == 'LTE' and not self.keysight_test_app.get_cell_state(
588                        cell['cell_type'], cell['cell_number']):
589                    self.log.info('Turning LTE Cell {} on.'.format(
590                        cell['cell_number']))
591                    self.keysight_test_app.set_cell_state(
592                        cell['cell_type'], cell['cell_number'], 1)
593            self.log.info('Waiting for LTE connections')
594            # Turn airplane mode off
595            num_apm_toggles = 10
596            for idx in range(num_apm_toggles):
597                self.log.info('Turning off airplane mode')
598                self.dut_utils.toggle_airplane_mode(False, False, idx)
599                if self.keysight_test_app.wait_for_cell_status(
600                        'LTE', 'CELL1', 'CONN', 10 * (idx + 1)):
601                    self.log.info('Connected! Waiting for {} seconds.'.format(
602                        LONG_SLEEP))
603                    time.sleep(LONG_SLEEP)
604                    break
605                elif idx < num_apm_toggles - 1:
606                    self.log.info('Turning on airplane mode')
607                    self.dut_utils.toggle_airplane_mode(True, False, idx)
608                    time.sleep(MEDIUM_SLEEP)
609                else:
610                    asserts.fail('DUT did not connect to LTE.')
611            # Activate LTE aggregation if applicable
612            if testcase_params['endc_combo_config']['lte_scc_list']:
613                self.keysight_test_app.apply_lte_carrier_agg(
614                    testcase_params['endc_combo_config']['lte_scc_list'])
615
616            if testcase_params['endc_combo_config']['nr_cell_count']:
617                self.keysight_test_app.apply_carrier_agg()
618                self.log.info('Waiting for 5G connection')
619                connected = self.keysight_test_app.wait_for_cell_status(
620                    'NR5G',
621                    testcase_params['endc_combo_config']['nr_cell_count'],
622                    ['ACT', 'CONN'], 60)
623                if not connected:
624                    asserts.fail('DUT did not connect to NR.')
625            time.sleep(SHORT_SLEEP)
626        elif testcase_params['endc_combo_config']['nr_cell_count']:
627            # Connect flow for NR FR1 Standalone
628            # Turn on NR cells
629            for cell in testcase_params['endc_combo_config']['cell_list']:
630                if cell['cell_type'] == 'NR5G' and not self.keysight_test_app.get_cell_state(
631                        cell['cell_type'], cell['cell_number']):
632                    self.log.info('Turning NR Cell {} on.'.format(
633                        cell['cell_number']))
634                    self.keysight_test_app.set_cell_state(
635                        cell['cell_type'], cell['cell_number'], 1)
636            num_apm_toggles = 10
637            for idx in range(num_apm_toggles):
638                self.log.info('Turning off airplane mode now.')
639                self.dut_utils.toggle_airplane_mode(False, False, idx)
640                if self.keysight_test_app.wait_for_cell_status(
641                        'NR5G', 'CELL1', 'CONN', 10 * (idx + 1)):
642                    self.log.info('Connected! Waiting for {} seconds.'.format(
643                        LONG_SLEEP))
644                    time.sleep(10*LONG_SLEEP)
645                    break
646                elif idx < num_apm_toggles - 1:
647                    self.log.info('Turning on airplane mode now.')
648                    self.dut_utils.toggle_airplane_mode(True, False, idx)
649                    time.sleep(MEDIUM_SLEEP)
650                else:
651                    asserts.fail('DUT did not connect to NR.')
652
653        #AWGN and fading are turned on after CELL ON and connect due to bug in UXM
654        for cell in testcase_params['endc_combo_config']['cell_list']:
655            if 'awgn_noise_level' in self.testclass_params:
656                self.keysight_test_app.enable_awgn_noise(cell['cell_type'], cell['cell_number'],
657                                                         1,
658                                                         self.testclass_params['awgn_noise_level'])
659
660        if 'fading_scenario' in self.testclass_params and self.testclass_params[
661                'fading_scenario']['enable']:
662            self.log.info('Enabling fading.')
663            self.keysight_test_app.set_channel_emulator_state(
664                self.testclass_params['fading_scenario']['enable'])
665
666    def _test_throughput_bler(self, testcase_params):
667        """Test function to run cellular throughput and BLER measurements.
668
669        The function runs BLER/throughput measurement after configuring the
670        callbox and DUT. The test supports running PHY or TCP/UDP layer traffic
671        in a variety of band/carrier/mcs/etc configurations.
672
673        Args:
674            testcase_params: dict containing test-specific parameters
675        Returns:
676            result: dict containing throughput results and meta data
677        """
678        # Prepare results dicts
679        testcase_params = self.compile_test_params(testcase_params)
680        testcase_results = collections.OrderedDict()
681        testcase_results['testcase_params'] = testcase_params
682        testcase_results['results'] = []
683
684        # Setup ota chamber if needed
685        if hasattr(self,
686                   'keysight_chamber') and 'orientation' in testcase_params:
687            self.keysight_chamber.move_theta_phi_abs(
688                self.keysight_chamber.preset_orientations[
689                    testcase_params['orientation']]['theta'],
690                self.keysight_chamber.preset_orientations[
691                    testcase_params['orientation']]['phi'])
692
693        # Setup tester and wait for DUT to connect
694        self.setup_tester(testcase_params)
695        # Put DUT to sleep for power measurements
696        self.dut_utils.go_to_sleep()
697
698        # Run throughput test loop
699        stop_counter = 0
700        if testcase_params['endc_combo_config']['nr_cell_count']:
701            self.keysight_test_app.select_display_tab('NR5G', 1, 'BTHR',
702                                                      'OTAGRAPH')
703        else:
704            self.keysight_test_app.select_display_tab('LTE', 1, 'BTHR',
705                                                      'OTAGRAPH')
706        for power_idx in range(len(testcase_params['cell_power_sweep'][0])):
707            result = collections.OrderedDict()
708            # Check that cells are still connected
709            connected = 1
710            for cell in testcase_params['endc_combo_config']['cell_list']:
711                if not self.keysight_test_app.wait_for_cell_status(
712                        cell['cell_type'], cell['cell_number'],
713                    ['ACT', 'CONN'], LONG_SLEEP, VERY_SHORT_SLEEP):
714                    connected = 0
715            if not connected:
716                self.log.info('DUT lost connection to cells. Ending test.')
717                break
718            # Set DL cell power
719            for cell_idx, cell in enumerate(
720                    testcase_params['endc_combo_config']['cell_list']):
721                cell_power_array = []
722                current_cell_power = testcase_params['cell_power_sweep'][
723                    cell_idx][power_idx]
724                cell_power_array.append(current_cell_power)
725                self.keysight_test_app.set_cell_dl_power(
726                    cell['cell_type'], cell['cell_number'], current_cell_power,
727                    0)
728            result['cell_power'] = cell_power_array
729            # Start BLER and throughput measurements
730            self.log.info('Cell powers: {}'.format(cell_power_array))
731            self.start_single_throughput_measurement(testcase_params)
732            if self.power_monitor:
733                measurement_wait = LONG_SLEEP if (power_idx == 0) else 0
734                average_power = self.collect_power_data(
735                    self.testclass_params['traffic_duration'],
736                    measurement_wait,
737                    reconnect_usb=0,
738                    measurement_tag=power_idx)
739                result['average_power'] = average_power
740            current_throughput = self.stop_single_throughput_measurement(
741                testcase_params)
742            result['throughput_measurements'] = current_throughput
743            self.print_throughput_result(current_throughput)
744
745            if self.testclass_params.get('log_rsrp_metrics', 1) and self.dut.is_connected():
746                lte_rx_meas = self.dut_utils.get_rx_measurements('LTE')
747                nr_rx_meas = self.dut_utils.get_rx_measurements('NR5G')
748                result['lte_rx_measurements'] = lte_rx_meas
749                result['nr_rx_measurements'] = nr_rx_meas
750                self.log.info('LTE Rx Measurements: {}'.format(lte_rx_meas))
751                self.log.info('NR Rx Measurements: {}'.format(nr_rx_meas))
752
753            testcase_results['results'].append(result)
754            if (('lte_bler_result' in result['throughput_measurements']
755                 and result['throughput_measurements']['lte_bler_result']
756                 ['total']['DL']['nack_ratio'] * 100 > 99)
757                    or ('nr_bler_result' in result['throughput_measurements']
758                        and result['throughput_measurements']['nr_bler_result']
759                        ['total']['DL']['nack_ratio'] * 100 > 99)):
760                stop_counter = stop_counter + 1
761            else:
762                stop_counter = 0
763            if stop_counter == STOP_COUNTER_LIMIT:
764                break
765
766        # Save results
767        self.testclass_results[self.current_test_name] = testcase_results
768
769    def dut_rockbottom(self):
770        """Set the dut to rockbottom state
771
772        """
773        # The rockbottom script might include a device reboot, so it is
774        # necessary to stop SL4A during its execution.
775        self.dut.stop_services()
776        self.log.info('Executing rockbottom script for ' + self.dut.model)
777        os.system('{} {}'.format('/root/rockbottom_km4.sh', self.dut.serial))
778        # Make sure the DUT is in root mode after coming back
779        self.dut.root_adb()
780        # Restart SL4A
781        self.dut.start_services()
782
783    def test_measure_power(self):
784
785        self.dut_rockbottom()
786        self.log.info('Turing screen off')
787        self.dut_utils.set_screen_state(0)
788        self.dut_utils.toggle_airplane_mode(True, False)
789        self.dut_utils.go_to_sleep()
790        time.sleep(10)
791        self.log.info('Measuring power now.')
792        self.collect_power_data(600, 10)
793