1#!/usr/bin/env python3.4 2# 3# Copyright 2021 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import collections 18import pyvisa 19import time 20from acts import logger 21from acts_contrib.test_utils.cellular.performance import cellular_performance_test_utils as cputils 22 23SHORT_SLEEP = 1 24VERY_SHORT_SLEEP = 0.1 25SUBFRAME_DURATION = 0.001 26VISA_QUERY_DELAY = 0.01 27 28 29class Keysight5GTestApp(object): 30 """Controller for the Keysight 5G NR Test Application. 31 32 This controller enables interacting with a Keysight Test Application 33 running on a remote test PC and implements many of the configuration 34 parameters supported in test app. 35 """ 36 37 VISA_LOCATION = '/opt/keysight/iolibs/libktvisa32.so' 38 39 def __init__(self, config): 40 self.config = config 41 self.log = logger.create_tagged_trace_logger("{}{}".format( 42 self.config['brand'], self.config['model'])) 43 self.resource_manager = pyvisa.ResourceManager(self.VISA_LOCATION) 44 self.test_app = self.resource_manager.open_resource( 45 'TCPIP0::{}::{}::INSTR'.format(self.config['ip_address'], 46 self.config['hislip_interface'])) 47 self.test_app.timeout = 200000 48 self.test_app.write_termination = '\n' 49 self.test_app.read_termination = '\n' 50 self.test_app.query_delay = VISA_QUERY_DELAY 51 self.last_loaded_scpi = None 52 53 inst_id = self.send_cmd('*IDN?', 1) 54 if 'Keysight' not in inst_id[0]: 55 self.log.error( 56 'Failed to connect to Keysight Test App: {}'.format(inst_id)) 57 else: 58 self.log.info("Test App ID: {}".format(inst_id)) 59 60 def destroy(self): 61 self.test_app.close() 62 63 ### Programming Utilities 64 @staticmethod 65 def _format_cells(cells): 66 "Helper function to format list of cells." 67 if isinstance(cells, int): 68 return 'CELL{}'.format(cells) 69 elif isinstance(cells, str): 70 return cells 71 elif isinstance(cells, list): 72 cell_list = [ 73 Keysight5GTestApp._format_cells(cell) for cell in cells 74 ] 75 cell_list = ','.join(cell_list) 76 return cell_list 77 78 @staticmethod 79 def _format_response(response): 80 "Helper function to format test app response." 81 82 def _format_response_entry(entry): 83 try: 84 formatted_entry = float(entry) 85 except: 86 formatted_entry = entry 87 return formatted_entry 88 89 if ',' not in response: 90 return _format_response_entry(response) 91 response = response.split(',') 92 formatted_response = [ 93 _format_response_entry(entry) for entry in response 94 ] 95 return formatted_response 96 97 def send_cmd(self, command, read_response=0, check_errors=1): 98 "Helper function to write to or query test app." 99 if read_response: 100 try: 101 response = Keysight5GTestApp._format_response( 102 self.test_app.query(command)) 103 time.sleep(VISA_QUERY_DELAY) 104 if check_errors: 105 error = self.test_app.query('SYSTem:ERRor?') 106 time.sleep(VISA_QUERY_DELAY) 107 if 'No error' not in error: 108 self.log.warning("Command: {}. Error: {}".format( 109 command, error)) 110 return response 111 except: 112 raise RuntimeError('Lost connection to test app.') 113 else: 114 try: 115 self.test_app.write(command) 116 time.sleep(VISA_QUERY_DELAY) 117 if check_errors: 118 error = self.test_app.query('SYSTem:ERRor?') 119 if 'No error' not in error: 120 self.log.warning("Command: {}. Error: {}".format( 121 command, error)) 122 self.send_cmd('*OPC?', 1) 123 time.sleep(VISA_QUERY_DELAY) 124 except: 125 raise RuntimeError('Lost connection to test app.') 126 return None 127 128 def import_scpi_file(self, file_name, check_last_loaded=0): 129 """Function to import SCPI file specified in file_name. 130 131 Args: 132 file_name: name of SCPI file to run 133 check_last_loaded: flag to check last loaded scpi and 134 only load if different. 135 """ 136 if file_name == self.last_loaded_scpi and check_last_loaded: 137 self.log.info('Skipping SCPI import.') 138 self.send_cmd("SYSTem:SCPI:IMPort '{}'".format(file_name)) 139 while int(self.send_cmd('SYSTem:SCPI:IMPort:STATus?', 1)): 140 self.send_cmd('*OPC?', 1) 141 self.log.info('Done with SCPI import') 142 143 ### Configure Cells 144 def assert_cell_off_decorator(func): 145 "Decorator function that ensures cells or off when configuring them" 146 147 def inner(self, *args, **kwargs): 148 if "nr" in func.__name__: 149 cell_type = 'NR5G' 150 else: 151 cell_type = kwargs.get('cell_type', args[0]) 152 cell = kwargs.get('cell', args[1]) 153 cell_state = self.get_cell_state(cell_type, cell) 154 if cell_state: 155 self.log.error('Cell must be off when calling {}'.format( 156 func.__name__)) 157 return (func(self, *args, **kwargs)) 158 159 return inner 160 161 def assert_cell_off(self, cell_type, cell): 162 cell_state = self.get_cell_state(cell_type, cell) 163 if cell_state: 164 self.log.error('Cell must be off') 165 166 def select_cell(self, cell_type, cell): 167 """Function to select active cell. 168 169 Args: 170 cell_type: LTE or NR5G cell 171 cell: cell/carrier number 172 """ 173 self.send_cmd('BSE:SELected:CELL {},{}'.format( 174 cell_type, Keysight5GTestApp._format_cells(cell))) 175 176 def select_display_tab(self, cell_type, cell, tab, subtab): 177 """Function to select display tab. 178 179 Args: 180 cell_type: LTE or NR5G cell 181 cell: cell/carrier number 182 tab: tab to display for the selected cell 183 """ 184 supported_tabs = { 185 'PHY': [ 186 'BWP', 'HARQ', 'PDSCH', 'PDCCH', 'PRACH', 'PUSCH', 'PUCCH', 187 'SRSC' 188 ], 189 'BTHR': ['SUMMARY', 'OTAGRAPH', 'ULOTA', 'DLOTA'], 190 'CSI': [] 191 } 192 if (tab not in supported_tabs) or (subtab not in supported_tabs[tab]): 193 return 194 self.select_cell(cell_type, cell) 195 self.send_cmd('DISPlay:{} {},{}'.format(cell_type, tab, subtab)) 196 197 def get_cell_state(self, cell_type, cell): 198 """Function to get cell on/off state. 199 200 Args: 201 cell_type: LTE or NR5G cell 202 cell: cell/carrier number 203 Returns: 204 cell_state: boolean. True if cell on 205 """ 206 cell_state = int( 207 self.send_cmd( 208 'BSE:CONFig:{}:{}:ACTive:STATe?'.format( 209 cell_type, Keysight5GTestApp._format_cells(cell)), 1)) 210 return cell_state 211 212 def wait_for_cell_status(self, 213 cell_type, 214 cell, 215 states, 216 timeout, 217 polling_interval=SHORT_SLEEP): 218 """Function to wait for a specific cell status 219 220 Args: 221 cell_type: LTE or NR5G cell 222 cell: cell/carrier number 223 states: list of acceptable states (ON, CONN, AGG, ACT, etc) 224 timeout: amount of time to wait for requested status 225 Returns: 226 True if one of the listed states is achieved 227 False if timed out waiting for acceptable state. 228 """ 229 states = [states] if isinstance(states, str) else states 230 for i in range(int(timeout / polling_interval)): 231 current_state = self.send_cmd( 232 'BSE:STATus:{}:{}?'.format( 233 cell_type, Keysight5GTestApp._format_cells(cell)), 1) 234 if current_state in states: 235 return True 236 time.sleep(polling_interval) 237 self.log.warning('Timeout waiting for {} {} {}'.format( 238 cell_type, Keysight5GTestApp._format_cells(cell), states)) 239 return False 240 241 def set_cell_state(self, cell_type, cell, state): 242 """Function to set cell state 243 244 Args: 245 cell_type: LTE or NR5G cell 246 cell: cell/carrier number 247 state: requested state 248 """ 249 self.send_cmd('BSE:CONFig:{}:{}:ACTive:STATe {}'.format( 250 cell_type, Keysight5GTestApp._format_cells(cell), state)) 251 252 def set_cell_type(self, cell_type, cell, sa_or_nsa): 253 """Function to set cell duplex mode 254 255 Args: 256 cell_type: LTE or NR5G cell 257 cell: cell/carrier number 258 sa_or_nsa: SA or NSA 259 """ 260 self.assert_cell_off(cell_type, cell) 261 self.send_cmd('BSE: CONFig:NR5G:{}:TYPE {}'.format( 262 Keysight5GTestApp._format_cells(cell), sa_or_nsa)) 263 264 def set_cell_duplex_mode(self, cell_type, cell, duplex_mode): 265 """Function to set cell duplex mode 266 267 Args: 268 cell_type: LTE or NR5G cell 269 cell: cell/carrier number 270 duplex_mode: TDD or FDD 271 """ 272 self.assert_cell_off(cell_type, cell) 273 self.send_cmd('BSE:CONFig:{}:{}:DUPLEX:MODe {}'.format( 274 cell_type, Keysight5GTestApp._format_cells(cell), duplex_mode)) 275 276 def set_cell_band(self, cell_type, cell, band): 277 """Function to set cell band 278 279 Args: 280 cell_type: LTE or NR5G cell 281 cell: cell/carrier number 282 band: LTE or NR band (e.g. 1,3,N260, N77) 283 """ 284 self.assert_cell_off(cell_type, cell) 285 self.send_cmd('BSE:CONFig:{}:{}:BAND {}'.format( 286 cell_type, Keysight5GTestApp._format_cells(cell), band)) 287 288 def set_cell_channel(self, cell_type, cell, channel, arfcn=1): 289 """Function to set cell frequency/channel 290 291 Args: 292 cell_type: LTE or NR5G cell 293 cell: cell/carrier number 294 channel: requested channel (ARFCN) or frequency in MHz 295 """ 296 self.assert_cell_off(cell_type, cell) 297 if cell_type == 'NR5G' and isinstance( 298 channel, str) and channel.lower() in ['low', 'mid', 'high']: 299 self.send_cmd('BSE:CONFig:{}:{}:TESTChanLoc {}'.format( 300 cell_type, Keysight5GTestApp._format_cells(cell), channel.upper())) 301 elif arfcn == 1: 302 self.send_cmd('BSE:CONFig:{}:{}:DL:CHANnel {}'.format( 303 cell_type, Keysight5GTestApp._format_cells(cell), channel)) 304 else: 305 self.send_cmd('BSE:CONFig:{}:{}:DL:FREQuency:MAIN {}'.format( 306 cell_type, Keysight5GTestApp._format_cells(cell), 307 channel * 1e6)) 308 309 def toggle_contiguous_nr_channels(self, force_contiguous): 310 self.assert_cell_off('NR5G', 1) 311 self.log.warning('Forcing contiguous NR channels overrides channel config.') 312 self.send_cmd('BSE:CONFig:NR5G:PHY:OPTimize:CONTiguous:STATe 0') 313 if force_contiguous: 314 self.send_cmd('BSE:CONFig:NR5G:PHY:OPTimize:CONTiguous:STATe 1') 315 316 def configure_contiguous_nr_channels(self, cell, band, channel): 317 """Function to set cell frequency/channel 318 319 Args: 320 cell: cell/carrier number 321 band: band to set channel in (only required for preset) 322 channel_preset: frequency in MHz or preset in [low, mid, or high] 323 """ 324 self.assert_cell_off('NR5G', cell) 325 self.send_cmd('BSE:CONFig:NR5G:PHY:OPTimize:CONTiguous:STATe 0') 326 if channel.lower() in ['low', 'mid', 'high']: 327 pcc_arfcn = cputils.PCC_PRESET_MAPPING[band][channel] 328 self.set_cell_channel('NR5G', cell, pcc_arfcn, 1) 329 else: 330 self.set_cell_channel('NR5G', cell, channel, 0) 331 self.send_cmd('BSE:CONFig:NR5G:PHY:OPTimize:CONTiguous:STATe 1') 332 333 def configure_noncontiguous_nr_channels(self, cells, band, channels): 334 """Function to set cell frequency/channel 335 336 Args: 337 cell: cell/carrier number 338 band: band number 339 channel: frequency in MHz 340 """ 341 for cell in cells: 342 self.assert_cell_off('NR5G', cell) 343 self.send_cmd('BSE:CONFig:NR5G:PHY:OPTimize:CONTiguous:STATe 0') 344 for cell, channel in zip(cells, channels): 345 self.set_cell_channel('NR5G', cell, channel, arfcn=0) 346 347 def set_cell_bandwidth(self, cell_type, cell, bandwidth): 348 """Function to set cell bandwidth 349 350 Args: 351 cell_type: LTE or NR5G cell 352 cell: cell/carrier number 353 bandwidth: requested bandwidth 354 """ 355 self.assert_cell_off(cell_type, cell) 356 self.send_cmd('BSE:CONFig:{}:{}:DL:BW {}'.format( 357 cell_type, Keysight5GTestApp._format_cells(cell), bandwidth)) 358 359 def set_nr_subcarrier_spacing(self, cell, subcarrier_spacing): 360 """Function to set cell bandwidth 361 362 Args: 363 cell: cell/carrier number 364 subcarrier_spacing: requested SCS 365 """ 366 self.assert_cell_off('NR5G', cell) 367 self.send_cmd('BSE:CONFig:NR5G:{}:SUBCarrier:SPACing:COMMon {}'.format( 368 Keysight5GTestApp._format_cells(cell), subcarrier_spacing)) 369 370 def set_cell_mimo_config(self, cell_type, cell, link, mimo_config): 371 """Function to set cell mimo config. 372 373 Args: 374 cell_type: LTE or NR5G cell 375 cell: cell/carrier number 376 link: uplink or downlink 377 mimo_config: requested mimo configuration (refer to SCPI 378 documentation for allowed range of values) 379 """ 380 self.assert_cell_off(cell_type, cell) 381 if cell_type == 'NR5G': 382 self.send_cmd('BSE:CONFig:{}:{}:{}:MIMO:CONFig {}'.format( 383 cell_type, Keysight5GTestApp._format_cells(cell), link, 384 mimo_config)) 385 else: 386 self.send_cmd('BSE:CONFig:{}:{}:PHY:DL:ANTenna:CONFig {}'.format( 387 cell_type, Keysight5GTestApp._format_cells(cell), mimo_config)) 388 389 def set_lte_cell_transmission_mode(self, cell, transmission_mode): 390 """Function to set LTE cell transmission mode. 391 392 Args: 393 cell: cell/carrier number 394 transmission_mode: one of TM1, TM2, TM3, TM4 ... 395 """ 396 self.assert_cell_off('LTE', cell) 397 self.send_cmd('BSE:CONFig:LTE:{}:RRC:TMODe {}'.format( 398 Keysight5GTestApp._format_cells(cell), transmission_mode)) 399 400 def set_cell_dl_power(self, cell_type, cell, power, full_bw): 401 """Function to set cell power 402 403 Args: 404 cell_type: LTE or NR5G cell 405 cell: cell/carrier number 406 power: requested power 407 full_bw: boolean controlling if requested power is per channel 408 or subcarrier 409 """ 410 if full_bw: 411 self.send_cmd('BSE:CONFIG:{}:{}:DL:POWer:CHANnel {}'.format( 412 cell_type, Keysight5GTestApp._format_cells(cell), power)) 413 else: 414 self.send_cmd('BSE:CONFIG:{}:{}:DL:POWer:EPRE {}'.format( 415 cell_type, Keysight5GTestApp._format_cells(cell), power)) 416 self.send_cmd('BSE:CONFig:{}:APPLY'.format(cell_type)) 417 418 def set_cell_duplex_mode(self, cell_type, cell, duplex_mode): 419 """Function to set cell power 420 421 Args: 422 cell_type: LTE or NR5G cell 423 cell: cell/carrier number 424 duplex mode: TDD or FDD 425 """ 426 self.assert_cell_off(cell_type, cell) 427 self.send_cmd('BSE:CONFig:{}:{}:DUPLEX:MODe {}'.format( 428 cell_type, Keysight5GTestApp._format_cells(cell), duplex_mode)) 429 430 def set_dl_carriers(self, cells): 431 """Function to set aggregated DL NR5G carriers 432 433 Args: 434 cells: list of DL cells/carriers to aggregate with LTE (e.g. [1,2]) 435 """ 436 self.send_cmd('BSE:CONFig:NR5G:CELL1:CAGGregation:NRCC:DL {}'.format( 437 Keysight5GTestApp._format_cells(cells))) 438 439 def set_ul_carriers(self, cells): 440 """Function to set aggregated UL NR5G carriers 441 442 Args: 443 cells: list of DL cells/carriers to aggregate with LTE (e.g. [1,2]) 444 """ 445 self.send_cmd('BSE:CONFig:NR5G:CELL1:CAGGregation:NRCC:UL {}'.format( 446 Keysight5GTestApp._format_cells(cells))) 447 448 def set_nr_cell_schedule_scenario(self, cell, scenario): 449 """Function to set NR schedule to one of predefince quick configs. 450 451 Args: 452 cell: cell number to address. schedule will apply to all cells 453 scenario: one of the predefined test app schedlue quick configs 454 (e.g. FULL_TPUT, BASIC). 455 """ 456 self.assert_cell_off('NR5G', cell) 457 self.send_cmd( 458 'BSE:CONFig:NR5G:{}:SCHeduling:QCONFig:SCENario {}'.format( 459 Keysight5GTestApp._format_cells(cell), scenario)) 460 self.send_cmd('BSE:CONFig:NR5G:SCHeduling:QCONFig:APPLy:ALL') 461 462 def set_nr_cell_mcs(self, cell, dl_mcs, ul_mcs): 463 """Function to set NR cell DL & UL MCS 464 465 Args: 466 cell: cell number to address. MCS will apply to all cells 467 dl_mcs: mcs index to use on DL 468 ul_mcs: mcs index to use on UL 469 """ 470 self.assert_cell_off('NR5G', cell) 471 self.send_cmd( 472 'BSE:CONFig:NR5G:SCHeduling:SETParameter "CELLALL:BWPALL:FCALL:SCALL", "DL:IMCS", "{}"' 473 .format(dl_mcs)) 474 self.send_cmd( 475 'BSE:CONFig:NR5G:SCHeduling:SETParameter "CELLALL:BWPALL:FCALL:SCALL", "UL:IMCS", "{}"' 476 .format(ul_mcs)) 477 478 def set_lte_cell_mcs( 479 self, 480 cell, 481 dl_mcs_table, 482 dl_mcs, 483 ul_mcs_table, 484 ul_mcs, 485 ): 486 """Function to set NR cell DL & UL MCS 487 488 Args: 489 cell: cell number to address. MCS will apply to all cells 490 dl_mcs: mcs index to use on DL 491 ul_mcs: mcs index to use on UL 492 """ 493 if dl_mcs_table == 'QAM256': 494 dl_mcs_table_formatted = 'ASUBframe' 495 elif dl_mcs_table == 'QAM1024': 496 dl_mcs_table_formatted = 'ASUB1024' 497 elif dl_mcs_table == 'QAM64': 498 dl_mcs_table_formatted = 'DISabled' 499 self.assert_cell_off('LTE', cell) 500 self.send_cmd( 501 'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL", "DL:MCS:TABle", "{}"' 502 .format(dl_mcs_table_formatted)) 503 self.send_cmd( 504 'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL:SFALL:CWALL", "DL:IMCS", "{}"' 505 .format(dl_mcs)) 506 self.send_cmd( 507 'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL", "UL:MCS:TABle", "{}"' 508 .format(ul_mcs_table)) 509 self.send_cmd( 510 'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL:SFALL", "UL:IMCS", "{}"' 511 .format(ul_mcs)) 512 513 def set_lte_control_region_size(self, cell, num_symbols): 514 self.assert_cell_off('LTE', cell) 515 self.send_cmd('BSE:CONFig:LTE:{}:PHY:PCFich:CFI {}'.format( 516 Keysight5GTestApp._format_cells(cell), num_symbols)) 517 518 def set_lte_ul_mac_padding(self, mac_padding): 519 self.assert_cell_off('LTE', 'CELL1') 520 padding_str = 'TRUE' if mac_padding else 'FALSE' 521 self.send_cmd( 522 'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL", "UL:MAC:PADDING", "{}"' 523 .format(padding_str)) 524 525 def set_nr_ul_dft_precoding(self, cell, precoding): 526 """Function to configure DFT-precoding on uplink. 527 528 Args: 529 cell: cell number to address. MCS will apply to all cells 530 precoding: 0/1 to disable/enable precoding 531 """ 532 self.assert_cell_off('NR5G', cell) 533 precoding_str = "ENABled" if precoding else "DISabled" 534 self.send_cmd( 535 'BSE:CONFig:NR5G:{}:SCHeduling:QCONFig:UL:TRANsform:PRECoding {}'. 536 format(Keysight5GTestApp._format_cells(cell), precoding_str)) 537 precoding_str = "True" if precoding else "False" 538 self.send_cmd( 539 'BSE:CONFig:NR5G:SCHeduling:SETParameter "CELLALL:BWPALL", "UL:TPEnabled", "{}"' 540 .format(precoding_str)) 541 542 def configure_ul_clpc(self, channel, mode, target): 543 """Function to configure UL power control on all cells/carriers 544 545 Args: 546 channel: physical channel must be PUSCh or PUCCh 547 mode: mode supported by test app (all up/down bits, target, etc) 548 target: target power if mode is set to target 549 """ 550 self.send_cmd('BSE:CONFig:NR5G:UL:{}:CLPControl:MODE:ALL {}'.format( 551 channel, mode)) 552 if "tar" in mode.lower(): 553 self.send_cmd( 554 'BSE:CONFig:NR5G:UL:{}:CLPControl:TARGet:POWer:ALL {}'.format( 555 channel, target)) 556 557 def apply_lte_carrier_agg(self, cells): 558 """Function to start LTE carrier aggregation on already configured cells""" 559 if self.wait_for_cell_status('LTE', 'CELL1', 'CONN', 60): 560 self.send_cmd( 561 'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:SCC {}'.format( 562 Keysight5GTestApp._format_cells(cells))) 563 self.send_cmd( 564 'BSE:CONFig:LTE:CELL1:CAGGregation:ACTivate:SCC {}'.format( 565 Keysight5GTestApp._format_cells(cells))) 566 567 def apply_carrier_agg(self): 568 """Function to start carrier aggregation on already configured cells""" 569 if self.wait_for_cell_status('LTE', 'CELL1', 'CONN', 60): 570 self.send_cmd( 571 'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:APPly') 572 else: 573 raise RuntimeError('LTE must be connected to start aggregation.') 574 575 def get_ip_throughput(self, cell_type): 576 """Function to query IP layer throughput on LTE or NR 577 578 Args: 579 cell_type: LTE or NR5G 580 Returns: 581 dict containing DL and UL IP-layer throughput 582 """ 583 #Tester reply format 584 #{ report-count, total-bytes, current transfer-rate, average transfer-rate, peak transfer-rate } 585 dl_tput = self.send_cmd( 586 'BSE:MEASure:{}:BTHRoughput:DL:THRoughput:IP?'.format(cell_type), 587 1) 588 ul_tput = self.send_cmd( 589 'BSE:MEASure:{}:BTHRoughput:UL:THRoughput:IP?'.format(cell_type), 590 1) 591 return {'dl_tput': dl_tput, 'ul_tput': ul_tput} 592 593 def _get_throughput(self, cell_type, link, cell): 594 """Helper function to get PHY layer throughput on single cell""" 595 if cell_type == 'LTE': 596 tput_response = self.send_cmd( 597 'BSE:MEASure:LTE:{}:BTHRoughput:{}:THRoughput:OTA:{}?'.format( 598 Keysight5GTestApp._format_cells(cell), link, 599 Keysight5GTestApp._format_cells(cell)), 1) 600 elif cell_type == 'NR5G': 601 # Tester reply format 602 #progress-count, ack-count, ack-ratio, nack-count, nack-ratio, statdtx-count, statdtx-ratio, pdschBlerCount, pdschBlerRatio, pdschTputRatio. 603 tput_response = self.send_cmd( 604 'BSE:MEASure:NR5G:BTHRoughput:{}:THRoughput:OTA:{}?'.format( 605 link, Keysight5GTestApp._format_cells(cell)), 1) 606 tput_result = { 607 'frame_count': tput_response[0] / 1e6, 608 'current_tput': tput_response[1] / 1e6, 609 'min_tput': tput_response[2] / 1e6, 610 'max_tput': tput_response[3] / 1e6, 611 'average_tput': tput_response[4] / 1e6, 612 'theoretical_tput': tput_response[5] / 1e6, 613 } 614 return tput_result 615 616 def get_throughput(self, cell_type, cells): 617 """Function to get PHY layer throughput on on or more cells 618 619 This function returns the throughput data on the requested cells 620 during the last BLER test run, i.e., throughpt data must be fetch at 621 the end/after a BLE test run on the Keysight Test App. 622 623 Args: 624 cell_type: LTE or NR5G 625 cells: list of cells to query for throughput data 626 Returns: 627 tput_result: dict containing all throughput statistics in Mbps 628 """ 629 if not isinstance(cells, list): 630 cells = [cells] 631 tput_result = collections.OrderedDict() 632 for cell in cells: 633 tput_result[cell] = { 634 'DL': self._get_throughput(cell_type, 'DL', cell), 635 'UL': self._get_throughput(cell_type, 'UL', cell) 636 } 637 frame_count = tput_result[cell]['DL']['frame_count'] 638 agg_tput = { 639 'DL': { 640 'frame_count': frame_count, 641 'current_tput': 0, 642 'min_tput': 0, 643 'max_tput': 0, 644 'average_tput': 0, 645 'theoretical_tput': 0 646 }, 647 'UL': { 648 'frame_count': frame_count, 649 'current_tput': 0, 650 'min_tput': 0, 651 'max_tput': 0, 652 'average_tput': 0, 653 'theoretical_tput': 0 654 } 655 } 656 for cell, cell_tput in tput_result.items(): 657 for link, link_tput in cell_tput.items(): 658 for key, value in link_tput.items(): 659 if 'tput' in key: 660 agg_tput[link][key] = agg_tput[link][key] + value 661 tput_result['total'] = agg_tput 662 return tput_result 663 664 def _clear_bler_measurement(self, cell_type): 665 """Helper function to clear BLER results.""" 666 if cell_type == 'LTE': 667 self.send_cmd('BSE:MEASure:LTE:CELL1:BTHRoughput:CLEar') 668 elif cell_type == 'NR5G': 669 self.send_cmd('BSE:MEASure:NR5G:BTHRoughput:CLEar') 670 671 def _configure_bler_measurement(self, cell_type, continuous, length): 672 """Helper function to configure BLER results.""" 673 if continuous: 674 if cell_type == 'LTE': 675 self.send_cmd('BSE:MEASure:LTE:CELL1:BTHRoughput:CONTinuous 1') 676 elif cell_type == 'NR5G': 677 self.send_cmd('BSE:MEASure:NR5G:BTHRoughput:CONTinuous 1') 678 elif length > 1: 679 if cell_type == 'LTE': 680 self.send_cmd( 681 'BSE:MEASure:LTE:CELL1:BTHRoughput:LENGth {}'.format( 682 length)) 683 self.send_cmd('BSE:MEASure:LTE:CELL1:BTHRoughput:CONTinuous 0') 684 elif cell_type == 'NR5G': 685 self.send_cmd( 686 'BSE:MEASure:NR5G:BTHRoughput:LENGth {}'.format(length)) 687 self.send_cmd('BSE:MEASure:NR5G:BTHRoughput:CONTinuous 0') 688 689 def _set_bler_measurement_state(self, cell_type, state): 690 """Helper function to start or stop BLER measurement.""" 691 if cell_type == 'LTE': 692 self.send_cmd( 693 'BSE:MEASure:LTE:CELL1:BTHRoughput:STATe {}'.format(state)) 694 elif cell_type == 'NR5G': 695 self.send_cmd( 696 'BSE:MEASure:NR5G:BTHRoughput:STATe {}'.format(state)) 697 698 def start_bler_measurement(self, cell_type, cells, length): 699 """Function to kick off a BLER measurement 700 701 Args: 702 cell_type: LTE or NR5G 703 length: integer length of BLER measurements in subframes 704 """ 705 self._clear_bler_measurement(cell_type) 706 self._set_bler_measurement_state(cell_type, 0) 707 self._configure_bler_measurement(cell_type, 0, length) 708 self._set_bler_measurement_state(cell_type, 1) 709 time.sleep(0.1) 710 bler_check = self.get_bler_result(cell_type, cells, length, 0) 711 if bler_check['total']['DL']['frame_count'] == 0: 712 self.log.warning('BLER measurement did not start. Retrying') 713 self.start_bler_measurement(cell_type, cells, length) 714 715 def _get_bler(self, cell_type, link, cell): 716 """Helper function to get single-cell BLER measurement results.""" 717 if cell_type == 'LTE': 718 bler_response = self.send_cmd( 719 'BSE:MEASure:LTE:CELL1:BTHRoughput:{}:BLER:CELL1?'.format( 720 link), 1) 721 elif cell_type == 'NR5G': 722 bler_response = self.send_cmd( 723 'BSE:MEASure:NR5G:BTHRoughput:{}:BLER:{}?'.format( 724 link, Keysight5GTestApp._format_cells(cell)), 1) 725 bler_result = { 726 'frame_count': bler_response[0], 727 'ack_count': bler_response[1], 728 'ack_ratio': bler_response[2], 729 'nack_count': bler_response[3], 730 'nack_ratio': bler_response[4] 731 } 732 return bler_result 733 734 def get_bler_result(self, 735 cell_type, 736 cells, 737 length, 738 wait_for_length=1, 739 polling_interval=SHORT_SLEEP): 740 """Function to get BLER results. 741 742 This function gets the BLER measurements results on one or more 743 requested cells. The function can either return BLER statistics 744 immediately or wait until a certain number of subframes have been 745 counted (e.g. if the BLER measurement is done) 746 747 Args: 748 cell_type: LTE or NR5G 749 cells: list of cells for which to get BLER 750 length: number of subframes to wait for (typically set to the 751 configured length of the BLER measurements) 752 wait_for_length: boolean to block/wait till length subframes have 753 been counted. 754 Returns: 755 bler_result: dict containing per-cell and aggregate BLER results 756 """ 757 758 if not isinstance(cells, list): 759 cells = [cells] 760 while wait_for_length: 761 dl_bler = self._get_bler(cell_type, 'DL', cells[0]) 762 if dl_bler['frame_count'] < length: 763 time.sleep(polling_interval) 764 else: 765 break 766 767 bler_result = collections.OrderedDict() 768 for cell in cells: 769 bler_result[cell] = { 770 'DL': self._get_bler(cell_type, 'DL', cell), 771 'UL': self._get_bler(cell_type, 'UL', cell) 772 } 773 agg_bler = { 774 'DL': { 775 'frame_count': length, 776 'ack_count': 0, 777 'ack_ratio': 0, 778 'nack_count': 0, 779 'nack_ratio': 0 780 }, 781 'UL': { 782 'frame_count': length, 783 'ack_count': 0, 784 'ack_ratio': 0, 785 'nack_count': 0, 786 'nack_ratio': 0 787 } 788 } 789 for cell, cell_bler in bler_result.items(): 790 for link, link_bler in cell_bler.items(): 791 for key, value in link_bler.items(): 792 if 'ack_count' in key: 793 agg_bler[link][key] = agg_bler[link][key] + value 794 dl_ack_nack = agg_bler['DL']['ack_count'] + agg_bler['DL']['nack_count'] 795 ul_ack_nack = agg_bler['UL']['ack_count'] + agg_bler['UL']['nack_count'] 796 try: 797 agg_bler['DL'][ 798 'ack_ratio'] = agg_bler['DL']['ack_count'] / dl_ack_nack 799 agg_bler['DL'][ 800 'nack_ratio'] = agg_bler['DL']['nack_count'] / dl_ack_nack 801 agg_bler['UL'][ 802 'ack_ratio'] = agg_bler['UL']['ack_count'] / ul_ack_nack 803 agg_bler['UL'][ 804 'nack_ratio'] = agg_bler['UL']['nack_count'] / ul_ack_nack 805 except: 806 self.log.debug(bler_result) 807 agg_bler['DL']['ack_ratio'] = 0 808 agg_bler['DL']['nack_ratio'] = 1 809 agg_bler['UL']['ack_ratio'] = 0 810 agg_bler['UL']['nack_ratio'] = 1 811 bler_result['total'] = agg_bler 812 return bler_result 813 814 def measure_bler(self, cell_type, cells, length): 815 """Function to start and wait for BLER results. 816 817 This function starts a BLER test on a number of cells and waits for the 818 test to complete before returning the BLER measurements. 819 820 Args: 821 cell_type: LTE or NR5G 822 cells: list of cells for which to get BLER 823 length: number of subframes to wait for (typically set to the 824 configured length of the BLER measurements) 825 Returns: 826 bler_result: dict containing per-cell and aggregate BLER results 827 """ 828 self.start_bler_measurement(cell_type, cells, length) 829 time.sleep(length * SUBFRAME_DURATION) 830 bler_result = self.get_bler_result(cell_type, cells, length, 1) 831 return bler_result 832 833 def start_nr_rsrp_measurement(self, cells, length): 834 """Function to start 5G NR RSRP measurement. 835 836 Args: 837 cells: list of NR cells to get RSRP on 838 length: length of RSRP measurement in milliseconds 839 Returns: 840 rsrp_result: dict containing per-cell and aggregate BLER results 841 """ 842 for cell in cells: 843 self.send_cmd('BSE:MEASure:NR5G:{}:L1:RSRPower:STOP'.format( 844 Keysight5GTestApp._format_cells(cell))) 845 for cell in cells: 846 self.send_cmd('BSE:MEASure:NR5G:{}:L1:RSRPower:LENGth {}'.format( 847 Keysight5GTestApp._format_cells(cell), length)) 848 for cell in cells: 849 self.send_cmd('BSE:MEASure:NR5G:{}:L1:RSRPower:STARt'.format( 850 Keysight5GTestApp._format_cells(cell))) 851 852 def get_nr_rsrp_measurement_state(self, cells): 853 for cell in cells: 854 self.log.info( 855 self.send_cmd( 856 'BSE:MEASure:NR5G:{}:L1:RSRPower:STATe?'.format( 857 Keysight5GTestApp._format_cells(cell)), 1)) 858 859 def get_nr_rsrp_measurement_results(self, cells): 860 for cell in cells: 861 self.log.info( 862 self.send_cmd( 863 'BSE:MEASure:NR5G:{}:L1:RSRPower:REPorts:JSON?'.format( 864 Keysight5GTestApp._format_cells(cell)), 1)) 865