• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3.4
2#
3#   Copyright 2021 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections
18import pyvisa
19import time
20from acts import logger
21from acts_contrib.test_utils.cellular.performance import cellular_performance_test_utils as cputils
22
23SHORT_SLEEP = 1
24VERY_SHORT_SLEEP = 0.1
25SUBFRAME_DURATION = 0.001
26VISA_QUERY_DELAY = 0.01
27
28
29class Keysight5GTestApp(object):
30    """Controller for the Keysight 5G NR Test Application.
31
32    This controller enables interacting with a Keysight Test Application
33    running on a remote test PC and implements many of the configuration
34    parameters supported in test app.
35    """
36
37    VISA_LOCATION = '/opt/keysight/iolibs/libktvisa32.so'
38
39    def __init__(self, config):
40        self.config = config
41        self.log = logger.create_tagged_trace_logger("{}{}".format(
42            self.config['brand'], self.config['model']))
43        self.resource_manager = pyvisa.ResourceManager(self.VISA_LOCATION)
44        self.test_app = self.resource_manager.open_resource(
45            'TCPIP0::{}::{}::INSTR'.format(self.config['ip_address'],
46                                           self.config['hislip_interface']))
47        self.test_app.timeout = 200000
48        self.test_app.write_termination = '\n'
49        self.test_app.read_termination = '\n'
50        self.test_app.query_delay = VISA_QUERY_DELAY
51        self.last_loaded_scpi = None
52
53        inst_id = self.send_cmd('*IDN?', 1)
54        if 'Keysight' not in inst_id[0]:
55            self.log.error(
56                'Failed to connect to Keysight Test App: {}'.format(inst_id))
57        else:
58            self.log.info("Test App ID: {}".format(inst_id))
59
60    def destroy(self):
61        self.test_app.close()
62
63    ### Programming Utilities
64    @staticmethod
65    def _format_cells(cells):
66        "Helper function to format list of cells."
67        if isinstance(cells, int):
68            return 'CELL{}'.format(cells)
69        elif isinstance(cells, str):
70            return cells
71        elif isinstance(cells, list):
72            cell_list = [
73                Keysight5GTestApp._format_cells(cell) for cell in cells
74            ]
75            cell_list = ','.join(cell_list)
76            return cell_list
77
78    @staticmethod
79    def _format_response(response):
80        "Helper function to format test app response."
81
82        def _format_response_entry(entry):
83            try:
84                formatted_entry = float(entry)
85            except:
86                formatted_entry = entry
87            return formatted_entry
88
89        if ',' not in response:
90            return _format_response_entry(response)
91        response = response.split(',')
92        formatted_response = [
93            _format_response_entry(entry) for entry in response
94        ]
95        return formatted_response
96
97    def send_cmd(self, command, read_response=0, check_errors=1):
98        "Helper function to write to or query test app."
99        if read_response:
100            try:
101                response = Keysight5GTestApp._format_response(
102                    self.test_app.query(command))
103                time.sleep(VISA_QUERY_DELAY)
104                if check_errors:
105                    error = self.test_app.query('SYSTem:ERRor?')
106                    time.sleep(VISA_QUERY_DELAY)
107                    if 'No error' not in error:
108                        self.log.warning("Command: {}. Error: {}".format(
109                            command, error))
110                return response
111            except:
112                raise RuntimeError('Lost connection to test app.')
113        else:
114            try:
115                self.test_app.write(command)
116                time.sleep(VISA_QUERY_DELAY)
117                if check_errors:
118                    error = self.test_app.query('SYSTem:ERRor?')
119                    if 'No error' not in error:
120                        self.log.warning("Command: {}. Error: {}".format(
121                            command, error))
122                self.send_cmd('*OPC?', 1)
123                time.sleep(VISA_QUERY_DELAY)
124            except:
125                raise RuntimeError('Lost connection to test app.')
126            return None
127
128    def import_scpi_file(self, file_name, check_last_loaded=0):
129        """Function to import SCPI file specified in file_name.
130
131        Args:
132            file_name: name of SCPI file to run
133            check_last_loaded: flag to check last loaded scpi and
134            only load if different.
135        """
136        if file_name == self.last_loaded_scpi and check_last_loaded:
137            self.log.info('Skipping SCPI import.')
138        self.send_cmd("SYSTem:SCPI:IMPort '{}'".format(file_name))
139        while int(self.send_cmd('SYSTem:SCPI:IMPort:STATus?', 1)):
140            self.send_cmd('*OPC?', 1)
141        self.log.info('Done with SCPI import')
142
143    ### Configure Cells
144    def assert_cell_off_decorator(func):
145        "Decorator function that ensures cells or off when configuring them"
146
147        def inner(self, *args, **kwargs):
148            if "nr" in func.__name__:
149                cell_type = 'NR5G'
150            else:
151                cell_type = kwargs.get('cell_type', args[0])
152            cell = kwargs.get('cell', args[1])
153            cell_state = self.get_cell_state(cell_type, cell)
154            if cell_state:
155                self.log.error('Cell must be off when calling {}'.format(
156                    func.__name__))
157            return (func(self, *args, **kwargs))
158
159        return inner
160
161    def assert_cell_off(self, cell_type, cell):
162        cell_state = self.get_cell_state(cell_type, cell)
163        if cell_state:
164            self.log.error('Cell must be off')
165
166    def select_cell(self, cell_type, cell):
167        """Function to select active cell.
168
169        Args:
170            cell_type: LTE or NR5G cell
171            cell: cell/carrier number
172        """
173        self.send_cmd('BSE:SELected:CELL {},{}'.format(
174            cell_type, Keysight5GTestApp._format_cells(cell)))
175
176    def select_display_tab(self, cell_type, cell, tab, subtab):
177        """Function to select display tab.
178
179        Args:
180            cell_type: LTE or NR5G cell
181            cell: cell/carrier number
182            tab: tab to display for the selected cell
183        """
184        supported_tabs = {
185            'PHY': [
186                'BWP', 'HARQ', 'PDSCH', 'PDCCH', 'PRACH', 'PUSCH', 'PUCCH',
187                'SRSC'
188            ],
189            'BTHR': ['SUMMARY', 'OTAGRAPH', 'ULOTA', 'DLOTA'],
190            'CSI': []
191        }
192        if (tab not in supported_tabs) or (subtab not in supported_tabs[tab]):
193            return
194        self.select_cell(cell_type, cell)
195        self.send_cmd('DISPlay:{} {},{}'.format(cell_type, tab, subtab))
196
197    def get_cell_state(self, cell_type, cell):
198        """Function to get cell on/off state.
199
200        Args:
201            cell_type: LTE or NR5G cell
202            cell: cell/carrier number
203        Returns:
204            cell_state: boolean. True if cell on
205        """
206        cell_state = int(
207            self.send_cmd(
208                'BSE:CONFig:{}:{}:ACTive:STATe?'.format(
209                    cell_type, Keysight5GTestApp._format_cells(cell)), 1))
210        return cell_state
211
212    def wait_for_cell_status(self,
213                             cell_type,
214                             cell,
215                             states,
216                             timeout,
217                             polling_interval=SHORT_SLEEP):
218        """Function to wait for a specific cell status
219
220        Args:
221            cell_type: LTE or NR5G cell
222            cell: cell/carrier number
223            states: list of acceptable states (ON, CONN, AGG, ACT, etc)
224            timeout: amount of time to wait for requested status
225        Returns:
226            True if one of the listed states is achieved
227            False if timed out waiting for acceptable state.
228        """
229        states = [states] if isinstance(states, str) else states
230        for i in range(int(timeout / polling_interval)):
231            current_state = self.send_cmd(
232                'BSE:STATus:{}:{}?'.format(
233                    cell_type, Keysight5GTestApp._format_cells(cell)), 1)
234            if current_state in states:
235                return True
236            time.sleep(polling_interval)
237        self.log.warning('Timeout waiting for {} {} {}'.format(
238            cell_type, Keysight5GTestApp._format_cells(cell), states))
239        return False
240
241    def set_cell_state(self, cell_type, cell, state):
242        """Function to set cell state
243
244        Args:
245            cell_type: LTE or NR5G cell
246            cell: cell/carrier number
247            state: requested state
248        """
249        self.send_cmd('BSE:CONFig:{}:{}:ACTive:STATe {}'.format(
250            cell_type, Keysight5GTestApp._format_cells(cell), state))
251
252    def set_cell_type(self, cell_type, cell, sa_or_nsa):
253        """Function to set cell duplex mode
254
255        Args:
256            cell_type: LTE or NR5G cell
257            cell: cell/carrier number
258            sa_or_nsa: SA or NSA
259        """
260        self.assert_cell_off(cell_type, cell)
261        self.send_cmd('BSE: CONFig:NR5G:{}:TYPE {}'.format(
262            Keysight5GTestApp._format_cells(cell), sa_or_nsa))
263
264    def set_cell_duplex_mode(self, cell_type, cell, duplex_mode):
265        """Function to set cell duplex mode
266
267        Args:
268            cell_type: LTE or NR5G cell
269            cell: cell/carrier number
270            duplex_mode: TDD or FDD
271        """
272        self.assert_cell_off(cell_type, cell)
273        self.send_cmd('BSE:CONFig:{}:{}:DUPLEX:MODe {}'.format(
274            cell_type, Keysight5GTestApp._format_cells(cell), duplex_mode))
275
276    def set_cell_band(self, cell_type, cell, band):
277        """Function to set cell band
278
279        Args:
280            cell_type: LTE or NR5G cell
281            cell: cell/carrier number
282            band: LTE or NR band (e.g. 1,3,N260, N77)
283        """
284        self.assert_cell_off(cell_type, cell)
285        self.send_cmd('BSE:CONFig:{}:{}:BAND {}'.format(
286            cell_type, Keysight5GTestApp._format_cells(cell), band))
287
288    def set_cell_channel(self, cell_type, cell, channel, arfcn=1):
289        """Function to set cell frequency/channel
290
291        Args:
292            cell_type: LTE or NR5G cell
293            cell: cell/carrier number
294            channel: requested channel (ARFCN) or frequency in MHz
295        """
296        self.assert_cell_off(cell_type, cell)
297        if cell_type == 'NR5G' and isinstance(
298                channel, str) and channel.lower() in ['low', 'mid', 'high']:
299            self.send_cmd('BSE:CONFig:{}:{}:TESTChanLoc {}'.format(
300                cell_type, Keysight5GTestApp._format_cells(cell), channel.upper()))
301        elif arfcn == 1:
302            self.send_cmd('BSE:CONFig:{}:{}:DL:CHANnel {}'.format(
303                cell_type, Keysight5GTestApp._format_cells(cell), channel))
304        else:
305            self.send_cmd('BSE:CONFig:{}:{}:DL:FREQuency:MAIN {}'.format(
306                cell_type, Keysight5GTestApp._format_cells(cell),
307                channel * 1e6))
308
309    def toggle_contiguous_nr_channels(self, force_contiguous):
310        self.assert_cell_off('NR5G', 1)
311        self.log.warning('Forcing contiguous NR channels overrides channel config.')
312        self.send_cmd('BSE:CONFig:NR5G:PHY:OPTimize:CONTiguous:STATe 0')
313        if force_contiguous:
314            self.send_cmd('BSE:CONFig:NR5G:PHY:OPTimize:CONTiguous:STATe 1')
315
316    def configure_contiguous_nr_channels(self, cell, band, channel):
317        """Function to set cell frequency/channel
318
319        Args:
320            cell: cell/carrier number
321            band: band to set channel in (only required for preset)
322            channel_preset: frequency in MHz or preset in [low, mid, or high]
323        """
324        self.assert_cell_off('NR5G', cell)
325        self.send_cmd('BSE:CONFig:NR5G:PHY:OPTimize:CONTiguous:STATe 0')
326        if channel.lower() in ['low', 'mid', 'high']:
327            pcc_arfcn = cputils.PCC_PRESET_MAPPING[band][channel]
328            self.set_cell_channel('NR5G', cell, pcc_arfcn, 1)
329        else:
330            self.set_cell_channel('NR5G', cell, channel, 0)
331        self.send_cmd('BSE:CONFig:NR5G:PHY:OPTimize:CONTiguous:STATe 1')
332
333    def configure_noncontiguous_nr_channels(self, cells, band, channels):
334        """Function to set cell frequency/channel
335
336        Args:
337            cell: cell/carrier number
338            band: band number
339            channel: frequency in MHz
340        """
341        for cell in cells:
342            self.assert_cell_off('NR5G', cell)
343        self.send_cmd('BSE:CONFig:NR5G:PHY:OPTimize:CONTiguous:STATe 0')
344        for cell, channel in zip(cells, channels):
345            self.set_cell_channel('NR5G', cell, channel, arfcn=0)
346
347    def set_cell_bandwidth(self, cell_type, cell, bandwidth):
348        """Function to set cell bandwidth
349
350        Args:
351            cell_type: LTE or NR5G cell
352            cell: cell/carrier number
353            bandwidth: requested bandwidth
354        """
355        self.assert_cell_off(cell_type, cell)
356        self.send_cmd('BSE:CONFig:{}:{}:DL:BW {}'.format(
357            cell_type, Keysight5GTestApp._format_cells(cell), bandwidth))
358
359    def set_nr_subcarrier_spacing(self, cell, subcarrier_spacing):
360        """Function to set cell bandwidth
361
362        Args:
363            cell: cell/carrier number
364            subcarrier_spacing: requested SCS
365        """
366        self.assert_cell_off('NR5G', cell)
367        self.send_cmd('BSE:CONFig:NR5G:{}:SUBCarrier:SPACing:COMMon {}'.format(
368            Keysight5GTestApp._format_cells(cell), subcarrier_spacing))
369
370    def set_cell_mimo_config(self, cell_type, cell, link, mimo_config):
371        """Function to set cell mimo config.
372
373        Args:
374            cell_type: LTE or NR5G cell
375            cell: cell/carrier number
376            link: uplink or downlink
377            mimo_config: requested mimo configuration (refer to SCPI
378                         documentation for allowed range of values)
379        """
380        self.assert_cell_off(cell_type, cell)
381        if cell_type == 'NR5G':
382            self.send_cmd('BSE:CONFig:{}:{}:{}:MIMO:CONFig {}'.format(
383                cell_type, Keysight5GTestApp._format_cells(cell), link,
384                mimo_config))
385        else:
386            self.send_cmd('BSE:CONFig:{}:{}:PHY:DL:ANTenna:CONFig {}'.format(
387                cell_type, Keysight5GTestApp._format_cells(cell), mimo_config))
388
389    def set_lte_cell_transmission_mode(self, cell, transmission_mode):
390        """Function to set LTE cell transmission mode.
391
392        Args:
393            cell: cell/carrier number
394            transmission_mode: one of TM1, TM2, TM3, TM4 ...
395        """
396        self.assert_cell_off('LTE', cell)
397        self.send_cmd('BSE:CONFig:LTE:{}:RRC:TMODe {}'.format(
398            Keysight5GTestApp._format_cells(cell), transmission_mode))
399
400    def set_cell_dl_power(self, cell_type, cell, power, full_bw):
401        """Function to set cell power
402
403        Args:
404            cell_type: LTE or NR5G cell
405            cell: cell/carrier number
406            power: requested power
407            full_bw: boolean controlling if requested power is per channel
408                     or subcarrier
409        """
410        if full_bw:
411            self.send_cmd('BSE:CONFIG:{}:{}:DL:POWer:CHANnel {}'.format(
412                cell_type, Keysight5GTestApp._format_cells(cell), power))
413        else:
414            self.send_cmd('BSE:CONFIG:{}:{}:DL:POWer:EPRE {}'.format(
415                cell_type, Keysight5GTestApp._format_cells(cell), power))
416        self.send_cmd('BSE:CONFig:{}:APPLY'.format(cell_type))
417
418    def set_cell_duplex_mode(self, cell_type, cell, duplex_mode):
419        """Function to set cell power
420
421        Args:
422            cell_type: LTE or NR5G cell
423            cell: cell/carrier number
424            duplex mode: TDD or FDD
425        """
426        self.assert_cell_off(cell_type, cell)
427        self.send_cmd('BSE:CONFig:{}:{}:DUPLEX:MODe {}'.format(
428            cell_type, Keysight5GTestApp._format_cells(cell), duplex_mode))
429
430    def set_dl_carriers(self, cells):
431        """Function to set aggregated DL NR5G carriers
432
433        Args:
434            cells: list of DL cells/carriers to aggregate with LTE (e.g. [1,2])
435        """
436        self.send_cmd('BSE:CONFig:NR5G:CELL1:CAGGregation:NRCC:DL {}'.format(
437            Keysight5GTestApp._format_cells(cells)))
438
439    def set_ul_carriers(self, cells):
440        """Function to set aggregated UL NR5G carriers
441
442        Args:
443            cells: list of DL cells/carriers to aggregate with LTE (e.g. [1,2])
444        """
445        self.send_cmd('BSE:CONFig:NR5G:CELL1:CAGGregation:NRCC:UL {}'.format(
446            Keysight5GTestApp._format_cells(cells)))
447
448    def set_nr_cell_schedule_scenario(self, cell, scenario):
449        """Function to set NR schedule to one of predefince quick configs.
450
451        Args:
452            cell: cell number to address. schedule will apply to all cells
453            scenario: one of the predefined test app schedlue quick configs
454                      (e.g. FULL_TPUT, BASIC).
455        """
456        self.assert_cell_off('NR5G', cell)
457        self.send_cmd(
458            'BSE:CONFig:NR5G:{}:SCHeduling:QCONFig:SCENario {}'.format(
459                Keysight5GTestApp._format_cells(cell), scenario))
460        self.send_cmd('BSE:CONFig:NR5G:SCHeduling:QCONFig:APPLy:ALL')
461
462    def set_nr_cell_mcs(self, cell, dl_mcs, ul_mcs):
463        """Function to set NR cell DL & UL MCS
464
465        Args:
466            cell: cell number to address. MCS will apply to all cells
467            dl_mcs: mcs index to use on DL
468            ul_mcs: mcs index to use on UL
469        """
470        self.assert_cell_off('NR5G', cell)
471        self.send_cmd(
472            'BSE:CONFig:NR5G:SCHeduling:SETParameter "CELLALL:BWPALL:FCALL:SCALL", "DL:IMCS", "{}"'
473            .format(dl_mcs))
474        self.send_cmd(
475            'BSE:CONFig:NR5G:SCHeduling:SETParameter "CELLALL:BWPALL:FCALL:SCALL", "UL:IMCS", "{}"'
476            .format(ul_mcs))
477
478    def set_lte_cell_mcs(
479        self,
480        cell,
481        dl_mcs_table,
482        dl_mcs,
483        ul_mcs_table,
484        ul_mcs,
485    ):
486        """Function to set NR cell DL & UL MCS
487
488        Args:
489            cell: cell number to address. MCS will apply to all cells
490            dl_mcs: mcs index to use on DL
491            ul_mcs: mcs index to use on UL
492        """
493        if dl_mcs_table == 'QAM256':
494            dl_mcs_table_formatted = 'ASUBframe'
495        elif dl_mcs_table == 'QAM1024':
496            dl_mcs_table_formatted = 'ASUB1024'
497        elif dl_mcs_table == 'QAM64':
498            dl_mcs_table_formatted = 'DISabled'
499        self.assert_cell_off('LTE', cell)
500        self.send_cmd(
501            'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL", "DL:MCS:TABle", "{}"'
502            .format(dl_mcs_table_formatted))
503        self.send_cmd(
504            'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL:SFALL:CWALL", "DL:IMCS", "{}"'
505            .format(dl_mcs))
506        self.send_cmd(
507            'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL", "UL:MCS:TABle", "{}"'
508            .format(ul_mcs_table))
509        self.send_cmd(
510            'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL:SFALL", "UL:IMCS", "{}"'
511            .format(ul_mcs))
512
513    def set_lte_control_region_size(self, cell, num_symbols):
514        self.assert_cell_off('LTE', cell)
515        self.send_cmd('BSE:CONFig:LTE:{}:PHY:PCFich:CFI {}'.format(
516            Keysight5GTestApp._format_cells(cell), num_symbols))
517
518    def set_lte_ul_mac_padding(self, mac_padding):
519        self.assert_cell_off('LTE', 'CELL1')
520        padding_str = 'TRUE' if mac_padding else 'FALSE'
521        self.send_cmd(
522            'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL", "UL:MAC:PADDING", "{}"'
523            .format(padding_str))
524
525    def set_nr_ul_dft_precoding(self, cell, precoding):
526        """Function to configure DFT-precoding on uplink.
527
528        Args:
529            cell: cell number to address. MCS will apply to all cells
530            precoding: 0/1 to disable/enable precoding
531        """
532        self.assert_cell_off('NR5G', cell)
533        precoding_str = "ENABled" if precoding else "DISabled"
534        self.send_cmd(
535            'BSE:CONFig:NR5G:{}:SCHeduling:QCONFig:UL:TRANsform:PRECoding {}'.
536            format(Keysight5GTestApp._format_cells(cell), precoding_str))
537        precoding_str = "True" if precoding else "False"
538        self.send_cmd(
539            'BSE:CONFig:NR5G:SCHeduling:SETParameter "CELLALL:BWPALL", "UL:TPEnabled", "{}"'
540            .format(precoding_str))
541
542    def configure_ul_clpc(self, channel, mode, target):
543        """Function to configure UL power control on all cells/carriers
544
545        Args:
546            channel: physical channel must be PUSCh or PUCCh
547            mode: mode supported by test app (all up/down bits, target, etc)
548            target: target power if mode is set to target
549        """
550        self.send_cmd('BSE:CONFig:NR5G:UL:{}:CLPControl:MODE:ALL {}'.format(
551            channel, mode))
552        if "tar" in mode.lower():
553            self.send_cmd(
554                'BSE:CONFig:NR5G:UL:{}:CLPControl:TARGet:POWer:ALL {}'.format(
555                    channel, target))
556
557    def apply_lte_carrier_agg(self, cells):
558        """Function to start LTE carrier aggregation on already configured cells"""
559        if self.wait_for_cell_status('LTE', 'CELL1', 'CONN', 60):
560            self.send_cmd(
561                'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:SCC {}'.format(
562                    Keysight5GTestApp._format_cells(cells)))
563            self.send_cmd(
564                'BSE:CONFig:LTE:CELL1:CAGGregation:ACTivate:SCC {}'.format(
565                    Keysight5GTestApp._format_cells(cells)))
566
567    def apply_carrier_agg(self):
568        """Function to start carrier aggregation on already configured cells"""
569        if self.wait_for_cell_status('LTE', 'CELL1', 'CONN', 60):
570            self.send_cmd(
571                'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:APPly')
572        else:
573            raise RuntimeError('LTE must be connected to start aggregation.')
574
575    def get_ip_throughput(self, cell_type):
576        """Function to query IP layer throughput on LTE or NR
577
578        Args:
579            cell_type: LTE or NR5G
580        Returns:
581            dict containing DL and UL IP-layer throughput
582        """
583        #Tester reply format
584        #{ report-count, total-bytes, current transfer-rate, average transfer-rate, peak transfer-rate }
585        dl_tput = self.send_cmd(
586            'BSE:MEASure:{}:BTHRoughput:DL:THRoughput:IP?'.format(cell_type),
587            1)
588        ul_tput = self.send_cmd(
589            'BSE:MEASure:{}:BTHRoughput:UL:THRoughput:IP?'.format(cell_type),
590            1)
591        return {'dl_tput': dl_tput, 'ul_tput': ul_tput}
592
593    def _get_throughput(self, cell_type, link, cell):
594        """Helper function to get PHY layer throughput on single cell"""
595        if cell_type == 'LTE':
596            tput_response = self.send_cmd(
597                'BSE:MEASure:LTE:{}:BTHRoughput:{}:THRoughput:OTA:{}?'.format(
598                    Keysight5GTestApp._format_cells(cell), link,
599                    Keysight5GTestApp._format_cells(cell)), 1)
600        elif cell_type == 'NR5G':
601            # Tester reply format
602            #progress-count, ack-count, ack-ratio, nack-count, nack-ratio,  statdtx-count,  statdtx-ratio,  pdschBlerCount,  pdschBlerRatio,  pdschTputRatio.
603            tput_response = self.send_cmd(
604                'BSE:MEASure:NR5G:BTHRoughput:{}:THRoughput:OTA:{}?'.format(
605                    link, Keysight5GTestApp._format_cells(cell)), 1)
606        tput_result = {
607            'frame_count': tput_response[0] / 1e6,
608            'current_tput': tput_response[1] / 1e6,
609            'min_tput': tput_response[2] / 1e6,
610            'max_tput': tput_response[3] / 1e6,
611            'average_tput': tput_response[4] / 1e6,
612            'theoretical_tput': tput_response[5] / 1e6,
613        }
614        return tput_result
615
616    def get_throughput(self, cell_type, cells):
617        """Function to get PHY layer throughput on on or more cells
618
619        This function returns the throughput data on the requested cells
620        during the last BLER test run, i.e., throughpt data must be fetch at
621        the end/after a BLE test run on the Keysight Test App.
622
623        Args:
624            cell_type: LTE or NR5G
625            cells: list of cells to query for throughput data
626        Returns:
627            tput_result: dict containing all throughput statistics in Mbps
628        """
629        if not isinstance(cells, list):
630            cells = [cells]
631        tput_result = collections.OrderedDict()
632        for cell in cells:
633            tput_result[cell] = {
634                'DL': self._get_throughput(cell_type, 'DL', cell),
635                'UL': self._get_throughput(cell_type, 'UL', cell)
636            }
637            frame_count = tput_result[cell]['DL']['frame_count']
638        agg_tput = {
639            'DL': {
640                'frame_count': frame_count,
641                'current_tput': 0,
642                'min_tput': 0,
643                'max_tput': 0,
644                'average_tput': 0,
645                'theoretical_tput': 0
646            },
647            'UL': {
648                'frame_count': frame_count,
649                'current_tput': 0,
650                'min_tput': 0,
651                'max_tput': 0,
652                'average_tput': 0,
653                'theoretical_tput': 0
654            }
655        }
656        for cell, cell_tput in tput_result.items():
657            for link, link_tput in cell_tput.items():
658                for key, value in link_tput.items():
659                    if 'tput' in key:
660                        agg_tput[link][key] = agg_tput[link][key] + value
661        tput_result['total'] = agg_tput
662        return tput_result
663
664    def _clear_bler_measurement(self, cell_type):
665        """Helper function to clear BLER results."""
666        if cell_type == 'LTE':
667            self.send_cmd('BSE:MEASure:LTE:CELL1:BTHRoughput:CLEar')
668        elif cell_type == 'NR5G':
669            self.send_cmd('BSE:MEASure:NR5G:BTHRoughput:CLEar')
670
671    def _configure_bler_measurement(self, cell_type, continuous, length):
672        """Helper function to configure BLER results."""
673        if continuous:
674            if cell_type == 'LTE':
675                self.send_cmd('BSE:MEASure:LTE:CELL1:BTHRoughput:CONTinuous 1')
676            elif cell_type == 'NR5G':
677                self.send_cmd('BSE:MEASure:NR5G:BTHRoughput:CONTinuous 1')
678        elif length > 1:
679            if cell_type == 'LTE':
680                self.send_cmd(
681                    'BSE:MEASure:LTE:CELL1:BTHRoughput:LENGth {}'.format(
682                        length))
683                self.send_cmd('BSE:MEASure:LTE:CELL1:BTHRoughput:CONTinuous 0')
684            elif cell_type == 'NR5G':
685                self.send_cmd(
686                    'BSE:MEASure:NR5G:BTHRoughput:LENGth {}'.format(length))
687                self.send_cmd('BSE:MEASure:NR5G:BTHRoughput:CONTinuous 0')
688
689    def _set_bler_measurement_state(self, cell_type, state):
690        """Helper function to start or stop BLER measurement."""
691        if cell_type == 'LTE':
692            self.send_cmd(
693                'BSE:MEASure:LTE:CELL1:BTHRoughput:STATe {}'.format(state))
694        elif cell_type == 'NR5G':
695            self.send_cmd(
696                'BSE:MEASure:NR5G:BTHRoughput:STATe {}'.format(state))
697
698    def start_bler_measurement(self, cell_type, cells, length):
699        """Function to kick off a BLER measurement
700
701        Args:
702            cell_type: LTE or NR5G
703            length: integer length of BLER measurements in subframes
704        """
705        self._clear_bler_measurement(cell_type)
706        self._set_bler_measurement_state(cell_type, 0)
707        self._configure_bler_measurement(cell_type, 0, length)
708        self._set_bler_measurement_state(cell_type, 1)
709        time.sleep(0.1)
710        bler_check = self.get_bler_result(cell_type, cells, length, 0)
711        if bler_check['total']['DL']['frame_count'] == 0:
712            self.log.warning('BLER measurement did not start. Retrying')
713            self.start_bler_measurement(cell_type, cells, length)
714
715    def _get_bler(self, cell_type, link, cell):
716        """Helper function to get single-cell BLER measurement results."""
717        if cell_type == 'LTE':
718            bler_response = self.send_cmd(
719                'BSE:MEASure:LTE:CELL1:BTHRoughput:{}:BLER:CELL1?'.format(
720                    link), 1)
721        elif cell_type == 'NR5G':
722            bler_response = self.send_cmd(
723                'BSE:MEASure:NR5G:BTHRoughput:{}:BLER:{}?'.format(
724                    link, Keysight5GTestApp._format_cells(cell)), 1)
725        bler_result = {
726            'frame_count': bler_response[0],
727            'ack_count': bler_response[1],
728            'ack_ratio': bler_response[2],
729            'nack_count': bler_response[3],
730            'nack_ratio': bler_response[4]
731        }
732        return bler_result
733
734    def get_bler_result(self,
735                        cell_type,
736                        cells,
737                        length,
738                        wait_for_length=1,
739                        polling_interval=SHORT_SLEEP):
740        """Function to get BLER results.
741
742        This function gets the BLER measurements results on one or more
743        requested cells. The function can either return BLER statistics
744        immediately or wait until a certain number of subframes have been
745        counted (e.g. if the BLER measurement is done)
746
747        Args:
748            cell_type: LTE or NR5G
749            cells: list of cells for which to get BLER
750            length: number of subframes to wait for (typically set to the
751                    configured length of the BLER measurements)
752            wait_for_length: boolean to block/wait till length subframes have
753            been counted.
754        Returns:
755            bler_result: dict containing per-cell and aggregate BLER results
756        """
757
758        if not isinstance(cells, list):
759            cells = [cells]
760        while wait_for_length:
761            dl_bler = self._get_bler(cell_type, 'DL', cells[0])
762            if dl_bler['frame_count'] < length:
763                time.sleep(polling_interval)
764            else:
765                break
766
767        bler_result = collections.OrderedDict()
768        for cell in cells:
769            bler_result[cell] = {
770                'DL': self._get_bler(cell_type, 'DL', cell),
771                'UL': self._get_bler(cell_type, 'UL', cell)
772            }
773        agg_bler = {
774            'DL': {
775                'frame_count': length,
776                'ack_count': 0,
777                'ack_ratio': 0,
778                'nack_count': 0,
779                'nack_ratio': 0
780            },
781            'UL': {
782                'frame_count': length,
783                'ack_count': 0,
784                'ack_ratio': 0,
785                'nack_count': 0,
786                'nack_ratio': 0
787            }
788        }
789        for cell, cell_bler in bler_result.items():
790            for link, link_bler in cell_bler.items():
791                for key, value in link_bler.items():
792                    if 'ack_count' in key:
793                        agg_bler[link][key] = agg_bler[link][key] + value
794        dl_ack_nack = agg_bler['DL']['ack_count'] + agg_bler['DL']['nack_count']
795        ul_ack_nack = agg_bler['UL']['ack_count'] + agg_bler['UL']['nack_count']
796        try:
797            agg_bler['DL'][
798                'ack_ratio'] = agg_bler['DL']['ack_count'] / dl_ack_nack
799            agg_bler['DL'][
800                'nack_ratio'] = agg_bler['DL']['nack_count'] / dl_ack_nack
801            agg_bler['UL'][
802                'ack_ratio'] = agg_bler['UL']['ack_count'] / ul_ack_nack
803            agg_bler['UL'][
804                'nack_ratio'] = agg_bler['UL']['nack_count'] / ul_ack_nack
805        except:
806            self.log.debug(bler_result)
807            agg_bler['DL']['ack_ratio'] = 0
808            agg_bler['DL']['nack_ratio'] = 1
809            agg_bler['UL']['ack_ratio'] = 0
810            agg_bler['UL']['nack_ratio'] = 1
811        bler_result['total'] = agg_bler
812        return bler_result
813
814    def measure_bler(self, cell_type, cells, length):
815        """Function to start and wait for BLER results.
816
817        This function starts a BLER test on a number of cells and waits for the
818        test to complete before returning the BLER measurements.
819
820        Args:
821            cell_type: LTE or NR5G
822            cells: list of cells for which to get BLER
823            length: number of subframes to wait for (typically set to the
824                    configured length of the BLER measurements)
825        Returns:
826            bler_result: dict containing per-cell and aggregate BLER results
827        """
828        self.start_bler_measurement(cell_type, cells, length)
829        time.sleep(length * SUBFRAME_DURATION)
830        bler_result = self.get_bler_result(cell_type, cells, length, 1)
831        return bler_result
832
833    def start_nr_rsrp_measurement(self, cells, length):
834        """Function to start 5G NR RSRP measurement.
835
836        Args:
837            cells: list of NR cells to get RSRP on
838            length: length of RSRP measurement in milliseconds
839        Returns:
840            rsrp_result: dict containing per-cell and aggregate BLER results
841        """
842        for cell in cells:
843            self.send_cmd('BSE:MEASure:NR5G:{}:L1:RSRPower:STOP'.format(
844                Keysight5GTestApp._format_cells(cell)))
845        for cell in cells:
846            self.send_cmd('BSE:MEASure:NR5G:{}:L1:RSRPower:LENGth {}'.format(
847                Keysight5GTestApp._format_cells(cell), length))
848        for cell in cells:
849            self.send_cmd('BSE:MEASure:NR5G:{}:L1:RSRPower:STARt'.format(
850                Keysight5GTestApp._format_cells(cell)))
851
852    def get_nr_rsrp_measurement_state(self, cells):
853        for cell in cells:
854            self.log.info(
855                self.send_cmd(
856                    'BSE:MEASure:NR5G:{}:L1:RSRPower:STATe?'.format(
857                        Keysight5GTestApp._format_cells(cell)), 1))
858
859    def get_nr_rsrp_measurement_results(self, cells):
860        for cell in cells:
861            self.log.info(
862                self.send_cmd(
863                    'BSE:MEASure:NR5G:{}:L1:RSRPower:REPorts:JSON?'.format(
864                        Keysight5GTestApp._format_cells(cell)), 1))
865