1#!/usr/bin/env python3 2# 3# Copyright 2021 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18import time 19import sys 20 21from enum import Enum 22from os import path 23from acts.controllers import abstract_inst 24 25DEFAULT_XLAPI_PATH = '/home/mobileharness/Rohde-Schwarz/XLAPI/latest/venv/lib/python3.7/site-packages' 26DEFAULT_LTE_STATE_CHANGE_TIMER = 10 27DEFAULT_CELL_SWITCH_ON_TIMER = 60 28DEFAULT_ENDC_TIMER = 300 29 30logger = logging.getLogger('Xlapi_cmx500') 31 32LTE_CELL_PROPERTIES = [ 33 'band', 34 'bandwidth', 35 'dl_earfcn', 36 'ul_earfcn', 37 'total_dl_power', 38 'p_b', 39 'dl_epre', 40 'ref_signal_power', 41 'm', 42 'beamforming_antenna_ports', 43 'p0_nominal_pusch', 44] 45 46LTE_MHZ_UPPER_BOUND_TO_RB = [ 47 (1.5, 6), 48 (4.0, 15), 49 (7.5, 25), 50 (12.5, 50), 51 (17.5, 75), 52] 53 54 55class DciFormat(Enum): 56 """Support DCI Formats for MIMOs.""" 57 DCI_FORMAT_0 = 1 58 DCI_FORMAT_1 = 2 59 DCI_FORMAT_1A = 3 60 DCI_FORMAT_1B = 4 61 DCI_FORMAT_1C = 5 62 DCI_FORMAT_2 = 6 63 DCI_FORMAT_2A = 7 64 DCI_FORMAT_2B = 8 65 DCI_FORMAT_2C = 9 66 DCI_FORMAT_2D = 10 67 68 69class DuplexMode(Enum): 70 """Duplex Modes.""" 71 FDD = 'FDD' 72 TDD = 'TDD' 73 DL_ONLY = 'DL_ONLY' 74 75 76class LteBandwidth(Enum): 77 """Supported LTE bandwidths.""" 78 BANDWIDTH_1MHz = 6 # MHZ_1 is RB_6 79 BANDWIDTH_3MHz = 15 # MHZ_3 is RB_15 80 BANDWIDTH_5MHz = 25 # MHZ_5 is RB_25 81 BANDWIDTH_10MHz = 50 # MHZ_10 is RB_50 82 BANDWIDTH_15MHz = 75 # MHZ_15 is RB_75 83 BANDWIDTH_20MHz = 100 # MHZ_20 is RB_100 84 85 86class LteState(Enum): 87 """LTE ON and OFF.""" 88 LTE_ON = 'ON' 89 LTE_OFF = 'OFF' 90 91 92class MimoModes(Enum): 93 """MIMO Modes dl antennas.""" 94 MIMO1x1 = 1 95 MIMO2x2 = 2 96 MIMO4x4 = 4 97 98 99class ModulationType(Enum): 100 """Supported Modulation Types.""" 101 Q16 = 0 102 Q64 = 1 103 Q256 = 2 104 105 106class NasState(Enum): 107 """NAS state between callbox and dut.""" 108 DEREGISTERED = 'OFF' 109 EMM_REGISTERED = 'EMM' 110 MM5G_REGISTERED = 'NR' 111 112 113class RrcState(Enum): 114 """States to enable/disable rrc.""" 115 RRC_ON = 'ON' 116 RRC_OFF = 'OFF' 117 118 119class RrcConnectionState(Enum): 120 """RRC Connection states, describes possible DUT RRC connection states.""" 121 IDLE = 1 122 IDLE_PAGING = 2 123 IDLE_CONNECTION_ESTABLISHMENT = 3 124 CONNECTED = 4 125 CONNECTED_CONNECTION_REESTABLISHMENT = 5 126 CONNECTED_SCG_FAILURE = 6 127 CONNECTED_HANDOVER = 7 128 CONNECTED_CONNECTION_RELEASE = 8 129 130 131class SchedulingMode(Enum): 132 """Supported scheduling modes.""" 133 USERDEFINEDCH = 'UDCHannels' 134 135 136class TransmissionModes(Enum): 137 """Supported transmission modes.""" 138 TM1 = 1 139 TM2 = 2 140 TM3 = 3 141 TM4 = 4 142 TM7 = 7 143 TM8 = 8 144 TM9 = 9 145 146 147# For mimo 1x1, also set_num_crs_antenna_ports to 1 148MIMO_MAX_LAYER_MAPPING = { 149 MimoModes.MIMO1x1: 2, 150 MimoModes.MIMO2x2: 2, 151 MimoModes.MIMO4x4: 4, 152} 153 154 155class Cmx500(abstract_inst.SocketInstrument): 156 def __init__(self, ip_addr, port, xlapi_path=DEFAULT_XLAPI_PATH): 157 """Init method to setup variables for the controller. 158 159 Args: 160 ip_addr: Controller's ip address. 161 port: Port. 162 """ 163 164 # keeps the socket connection for debug purpose for now 165 super().__init__(ip_addr, port) 166 if not xlapi_path in sys.path: 167 sys.path.insert(0, xlapi_path) 168 self._initial_xlapi() 169 self._settings.system.set_instrument_address(ip_addr) 170 logger.info('The instrument address is {}'.format( 171 self._settings.system.get_instrument_address())) 172 173 self.bts = [] 174 175 # Stops all active cells if there is any 176 self.disconnect() 177 178 self.dut = self._network.get_dut() 179 self.lte_cell = self._network.create_lte_cell('ltecell0') 180 self.nr_cell = self._network.create_nr_cell('nrcell0') 181 self._config_antenna_ports() 182 183 self.lte_rrc_state_change_timer = DEFAULT_LTE_STATE_CHANGE_TIMER 184 self.rrc_state_change_time_enable = False 185 self.cell_switch_on_timer = DEFAULT_CELL_SWITCH_ON_TIMER 186 187 # _config_antenna_ports for the special RF connection with cmw500 + cmx500. 188 def _config_antenna_ports(self): 189 from rs_mrt.testenvironment.signaling.sri.rat.common import CsiRsAntennaPorts 190 from rs_mrt.testenvironment.signaling.sri.rat.lte import CrsAntennaPorts 191 192 max_csi_rs_ports = CsiRsAntennaPorts.NUMBER_CSI_RS_ANTENNA_PORTS_FOUR 193 max_crs_ports = CrsAntennaPorts.NUMBER_CRS_ANTENNA_PORTS_FOUR 194 195 lte_cell_max_config = self.lte_cell.stub.GetMaximumConfiguration() 196 lte_cell_max_config.csi_rs_antenna_ports = max_csi_rs_ports 197 lte_cell_max_config.crs_antenna_ports = max_crs_ports 198 self.lte_cell.stub.SetMaximumConfiguration(lte_cell_max_config) 199 200 nr_cell_max_config = self.nr_cell.stub.GetMaximumConfiguration() 201 nr_cell_max_config.csi_rs_antenna_ports = max_csi_rs_ports 202 self.nr_cell.stub.SetMaximumConfiguration(nr_cell_max_config) 203 204 def _initial_xlapi(self): 205 import xlapi 206 import mrtype 207 from xlapi import network 208 from xlapi import settings 209 210 self._xlapi = xlapi 211 self._network = network 212 self._settings = settings 213 214 def configure_mimo_settings(self, mimo, bts_index=0): 215 """Sets the mimo scenario for the test. 216 217 Args: 218 mimo: mimo scenario to set. 219 """ 220 self.bts[bts_index].set_mimo_mode(mimo) 221 222 @property 223 def connection_type(self): 224 """Gets the connection type applied in callbox.""" 225 state = self.dut.state.rrc_connection_state 226 return RrcConnectionState(state.value) 227 228 def create_base_station(self, cell): 229 """Creates the base station object with cell and current object. 230 231 Args: 232 cell: the XLAPI cell. 233 234 Returns: 235 base station object. 236 Raise: 237 CmxError if the cell is neither LTE nor NR. 238 """ 239 from xlapi.lte_cell import LteCell 240 from xlapi.nr_cell import NrCell 241 if isinstance(cell, LteCell): 242 return LteBaseStation(self, cell) 243 elif isinstance(cell, NrCell): 244 return NrBaseStation(self, cell) 245 else: 246 raise CmxError('The cell type is neither LTE nor NR') 247 248 def detach(self): 249 """Detach callbox and controller.""" 250 for bts in self.bts: 251 bts.stop() 252 253 def disable_packet_switching(self): 254 """Disable packet switching in call box.""" 255 raise NotImplementedError() 256 257 def disconnect(self): 258 """Disconnect controller from device and switch to local mode.""" 259 260 self.bts.clear() 261 self._network.reset() 262 263 def enable_packet_switching(self): 264 """Enable packet switching in call box.""" 265 raise NotImplementedError() 266 267 def get_cell_configs(self, cell): 268 """Getes cell settings. 269 270 This method is for debugging purpose. In XLAPI, there are many get 271 methods in the cell or component carrier object. When this method is 272 called, the corresponding value could be recorded with logging, which is 273 useful for debug. 274 275 Args: 276 cell: the lte or nr cell in xlapi 277 """ 278 cell_getters = [attr for attr in dir(cell) if attr.startswith('get')] 279 for attr in cell_getters: 280 try: 281 getter = getattr(cell, attr) 282 logger.info('The {} is {}'.format(attr, getter())) 283 except Exception as e: 284 logger.warning('Error in get {}: {}'.format(attr, e)) 285 286 def get_base_station(self, bts_index=0): 287 """Gets the base station object based on bts num. By default 288 bts_index set to 0 (PCC). 289 290 Args: 291 bts_num: base station identifier 292 293 Returns: 294 base station object. 295 """ 296 return self.bts[bts_index] 297 298 def get_network(self): 299 """ Gets the network object from cmx500 object.""" 300 return self._network 301 302 def init_lte_measurement(self): 303 """Gets the class object for lte measurement which can be used to 304 initiate measurements. 305 306 Returns: 307 lte measurement object. 308 """ 309 raise NotImplementedError() 310 311 def reset(self): 312 """System level reset.""" 313 314 self.disconnect() 315 316 @property 317 def rrc_connection(self): 318 """Gets the RRC connection state.""" 319 return self.dut.state.rrc.is_connected 320 321 def set_timer(self, timeout): 322 """Sets timer for the Cmx500 class.""" 323 self.rrc_state_change_time_enable = True 324 self.lte_rrc_state_change_timer = timeout 325 326 def switch_lte_signalling(self, state): 327 """ Turns LTE signalling ON/OFF. 328 329 Args: 330 state: an instance of LteState indicating the state to which LTE 331 signal has to be set. 332 """ 333 if not isinstance(state, LteState): 334 raise ValueError('state should be the instance of LteState.') 335 336 if self.bts: 337 self.disconnect() 338 self.bts.append(LteBaseStation(self, self.lte_cell)) 339 # Switch on the primary Lte cell for on state and switch all lte cells 340 # if the state is off state 341 if state.value == 'ON': 342 self.bts[0].start() 343 cell_status = self.bts[0].wait_cell_on(self.cell_switch_on_timer) 344 if cell_status: 345 logger.info('The LTE pcell status is on') 346 else: 347 raise CmxError('The LTE pcell cannot be switched on') 348 else: 349 for bts in self.bts: 350 if isinstance(bts, LteBaseStation): 351 bts.stop() 352 logger.info('The LTE cell status is {} after stop'.format( 353 bts.is_on())) 354 355 def switch_on_nsa_signalling(self): 356 357 from mrtype.counters import N310 358 359 if self.bts: 360 self.disconnect() 361 logger.info('Switches on NSA signalling') 362 363 # Sets n310 timer to N310.N20 to make endc more stable 364 logger.info('set nr cell n310 timer to N310.N20') 365 self.nr_cell.set_n310(N310.N20) 366 self.bts.append(LteBaseStation(self, self.lte_cell)) 367 self.bts.append(NrBaseStation(self, self.nr_cell)) 368 369 self.bts[0].start() 370 lte_cell_status = self.bts[0].wait_cell_on(self.cell_switch_on_timer) 371 if lte_cell_status: 372 logger.info('The LTE pcell status is on') 373 else: 374 raise CmxError('The LTE pcell cannot be switched on') 375 376 self.bts[1].start() 377 nr_cell_status = self.bts[1].wait_cell_on(self.cell_switch_on_timer) 378 if nr_cell_status: 379 logger.info('The NR cell status is on') 380 else: 381 raise CmxError('The NR cell cannot be switched on') 382 time.sleep(5) 383 384 def update_lte_cell_config(self, config): 385 """Updates lte cell settings with config.""" 386 set_counts = 0 387 for property in LTE_CELL_PROPERTIES: 388 if property in config: 389 setter_name = 'set_' + property 390 setter = getattr(self.lte_cell, setter_name) 391 setter(config[property]) 392 set_counts += 1 393 if set_counts < len(config): 394 logger.warning('Not all configs were set in update_cell_config') 395 396 @property 397 def use_carrier_specific(self): 398 """Gets current status of carrier specific duplex configuration.""" 399 raise NotImplementedError() 400 401 @use_carrier_specific.setter 402 def use_carrier_specific(self, state): 403 """Sets the carrier specific duplex configuration. 404 405 Args: 406 state: ON/OFF UCS configuration. 407 """ 408 raise NotImplementedError() 409 410 def wait_for_rrc_state(self, state, timeout=120): 411 """ Waits until a certain RRC state is set. 412 413 Args: 414 state: the RRC state that is being waited for. 415 timeout: timeout for phone to be in connected state. 416 417 Raises: 418 CmxError on time out. 419 """ 420 is_idle = (state.value == 'OFF') 421 for idx in range(timeout): 422 time.sleep(1) 423 if self.dut.state.rrc.is_idle == is_idle: 424 logger.info('{} reached at {} s'.format(state.value, idx)) 425 return True 426 error_message = 'Waiting for {} state timeout after {}'.format( 427 state.value, timeout) 428 logger.error(error_message) 429 raise CmxError(error_message) 430 431 def wait_until_attached(self, timeout=120): 432 """Waits until Lte attached. 433 434 Args: 435 timeout: timeout for phone to get attached. 436 437 Raises: 438 CmxError on time out. 439 """ 440 try: 441 self.dut.signaling.wait_for_lte_attach(self.lte_cell, timeout) 442 except: 443 raise CmxError( 444 'wait_until_attached timeout after {}'.format(timeout)) 445 446 def send_sms(self, message): 447 """ Sends an SMS message to the DUT. 448 449 Args: 450 message: the SMS message to send. 451 """ 452 self.dut.signaling.mt_sms(message) 453 454 455class BaseStation(object): 456 """Class to interact with different the base stations.""" 457 def __init__(self, cmx, cell): 458 """Init method to setup variables for base station. 459 460 Args: 461 cmx: Controller (Cmx500) object. 462 cell: The cell for the base station. 463 """ 464 465 self._cell = cell 466 self._cmx = cmx 467 self._cc = cmx.dut.cc(cell) 468 self._network = cmx.get_network() 469 470 @property 471 def band(self): 472 """Gets the current band of cell. 473 474 Return: 475 the band number in int. 476 """ 477 cell_band = self._cell.get_band() 478 return int(cell_band) 479 480 @property 481 def dl_power(self): 482 """Gets RSPRE level. 483 484 Return: 485 the power level in dbm. 486 """ 487 return self._cell.get_total_dl_power().in_dBm() 488 489 @property 490 def duplex_mode(self): 491 """Gets current duplex of cell.""" 492 band = self._cell.get_band() 493 if band.is_fdd(): 494 return DuplexMode.FDD 495 if band.is_tdd(): 496 return DuplexMode.TDD 497 if band.is_dl_only(): 498 return DuplexMode.DL_ONLY 499 500 def get_cc(self): 501 """Gets component carrier of the cell.""" 502 return self._cc 503 504 def is_on(self): 505 """Verifies if the cell is turned on. 506 507 Return: 508 boolean (if the cell is on). 509 """ 510 return self._cell.is_on() 511 512 def set_band(self, band): 513 """Sets the Band of cell. 514 515 Args: 516 band: band of cell. 517 """ 518 self._cell.set_band(band) 519 logger.info('The band is set to {} and is {} after setting'.format( 520 band, self.band)) 521 522 def set_dl_mac_padding(self, state): 523 """Enables/Disables downlink padding at the mac layer. 524 525 Args: 526 state: a boolean 527 """ 528 self._cc.set_dl_mac_padding(state) 529 530 def set_dl_power(self, pwlevel): 531 """Modifies RSPRE level. 532 533 Args: 534 pwlevel: power level in dBm. 535 """ 536 self._cell.set_total_dl_power(pwlevel) 537 538 def set_ul_power(self, ul_power): 539 """Sets ul power 540 541 Args: 542 ul_power: the uplink power in dbm 543 """ 544 self._cc.set_target_ul_power(ul_power) 545 546 def start(self): 547 """Starts the cell.""" 548 self._cell.start() 549 550 def stop(self): 551 """Stops the cell.""" 552 self._cell.stop() 553 554 def wait_cell_on(self, timeout): 555 """Waits the cell on. 556 557 Args: 558 timeout: the time for waiting the cell on. 559 560 Raises: 561 CmxError on time out. 562 """ 563 waiting_time = 0 564 while waiting_time < timeout: 565 if self._cell.is_on(): 566 return True 567 waiting_time += 1 568 time.sleep(1) 569 return self._cell.is_on() 570 571 572class LteBaseStation(BaseStation): 573 """ LTE base station.""" 574 def __init__(self, cmx, cell): 575 """Init method to setup variables for the LTE base station. 576 577 Args: 578 cmx: Controller (Cmx500) object. 579 cell: The cell for the LTE base station. 580 """ 581 from xlapi.lte_cell import LteCell 582 if not isinstance(cell, LteCell): 583 raise CmxError( 584 'The cell is not a LTE cell, LTE base station fails' 585 ' to create.') 586 super().__init__(cmx, cell) 587 588 def _config_scheduler(self, 589 dl_mcs=None, 590 dl_rb_alloc=None, 591 dl_dci_ncce=None, 592 dl_dci_format=None, 593 dl_tm=None, 594 dl_num_layers=None, 595 dl_mcs_table=None, 596 ul_mcs=None, 597 ul_rb_alloc=None, 598 ul_dci_ncce=None): 599 600 from rs_mrt.testenvironment.signaling.sri.rat.lte import DciFormat 601 from rs_mrt.testenvironment.signaling.sri.rat.lte import DlTransmissionMode 602 from rs_mrt.testenvironment.signaling.sri.rat.lte import MaxLayersMIMO 603 from rs_mrt.testenvironment.signaling.sri.rat.lte import McsTable 604 from rs_mrt.testenvironment.signaling.sri.rat.lte import PdcchFormat 605 606 log_list = [] 607 if dl_mcs: 608 log_list.append('dl_mcs: {}'.format(dl_mcs)) 609 if ul_mcs: 610 log_list.append('ul_mcs: {}'.format(ul_mcs)) 611 if dl_rb_alloc: 612 log_list.append('dl_rb_alloc: {}'.format(dl_rb_alloc)) 613 if ul_rb_alloc: 614 log_list.append('ul_rb_alloc: {}'.format(ul_rb_alloc)) 615 if dl_dci_ncce: 616 dl_dci_ncce = PdcchFormat(dl_dci_ncce) 617 log_list.append('dl_dci_ncce: {}'.format(dl_dci_ncce)) 618 if ul_dci_ncce: 619 ul_dci_ncce = PdcchFormat(ul_dci_ncce) 620 log_list.append('ul_dci_ncce: {}'.format(ul_dci_ncce)) 621 if dl_dci_format: 622 dl_dci_format = DciFormat(dl_dci_format) 623 log_list.append('dl_dci_format: {}'.format(dl_dci_format)) 624 if dl_tm: 625 dl_tm = DlTransmissionMode(dl_tm.value) 626 log_list.append('dl_tm: {}'.format(dl_tm)) 627 if dl_num_layers: 628 dl_num_layers = MaxLayersMIMO(dl_num_layers) 629 log_list.append('dl_num_layers: {}'.format(dl_num_layers)) 630 if dl_mcs_table: 631 dl_mcs_table = McsTable(dl_mcs_table) 632 log_list.append('dl_mcs_table: {}'.format(dl_mcs_table)) 633 634 num_crs_antenna_ports = self._cell.get_num_crs_antenna_ports() 635 636 # Sets num of crs antenna ports to 4 for configuring 637 self._cell.set_num_crs_antenna_ports(4) 638 scheduler = self._cmx.dut.get_scheduler(self._cell) 639 logger.info('configure scheduler for {}'.format(','.join(log_list))) 640 scheduler.configure_scheduler(dl_mcs=dl_mcs, 641 dl_rb_alloc=dl_rb_alloc, 642 dl_dci_ncce=dl_dci_ncce, 643 dl_dci_format=dl_dci_format, 644 dl_tm=dl_tm, 645 dl_num_layers=dl_num_layers, 646 dl_mcs_table=dl_mcs_table, 647 ul_mcs=ul_mcs, 648 ul_rb_alloc=ul_rb_alloc, 649 ul_dci_ncce=ul_dci_ncce) 650 logger.info('Configure scheduler succeeds') 651 652 # Sets num of crs antenna ports back to previous value 653 self._cell.set_num_crs_antenna_ports(num_crs_antenna_ports) 654 self._network.apply_changes() 655 656 @property 657 def bandwidth(self): 658 """Get the channel bandwidth of the cell. 659 660 Return: 661 the number rb of the bandwidth. 662 """ 663 return self._cell.get_bandwidth().num_rb 664 665 @property 666 def dl_channel(self): 667 """Gets the downlink channel of cell. 668 669 Return: 670 the downlink channel (earfcn) in int. 671 """ 672 return int(self._cell.get_dl_earfcn()) 673 674 @property 675 def dl_frequency(self): 676 """Get the downlink frequency of the cell.""" 677 from mrtype.frequency import Frequency 678 return self._cell.get_dl_earfcn().to_freq().in_units( 679 Frequency.Units.GHz) 680 681 def _to_rb_bandwidth(self, bandwidth): 682 for idx in range(5): 683 if bandwidth < LTE_MHZ_UPPER_BOUND_TO_RB[idx][0]: 684 return LTE_MHZ_UPPER_BOUND_TO_RB[idx][1] 685 return 100 686 687 def disable_all_ul_subframes(self): 688 """Disables all ul subframes for LTE cell.""" 689 self._cc.disable_all_ul_subframes() 690 self._network.apply_changes() 691 692 def set_bandwidth(self, bandwidth): 693 """Sets the channel bandwidth of the cell. 694 695 Args: 696 bandwidth: channel bandwidth of cell in MHz. 697 """ 698 self._cell.set_bandwidth(self._to_rb_bandwidth(bandwidth)) 699 self._network.apply_changes() 700 701 def set_cdrx_config(self): 702 """Sets LTE cdrx config for endc.""" 703 from mrtype.lte.drx import ( 704 LteDrxConfig, 705 LteDrxInactivityTimer, 706 LteDrxLongCycleStartOffset, 707 LteDrxOnDurationTimer, 708 LteDrxRetransmissionTimer, 709 ) 710 711 logger.info('Config Lte drx config') 712 lte_drx_config = LteDrxConfig( 713 on_duration_timer=LteDrxOnDurationTimer.PSF_10, 714 inactivity_timer=LteDrxInactivityTimer.PSF_200, 715 retransmission_timer=LteDrxRetransmissionTimer.PSF_33, 716 long_cycle_start_offset=LteDrxLongCycleStartOffset.ms160(83), 717 short_drx=None) 718 self._cmx.dut.lte_cell_group().set_drx_and_adjust_scheduler( 719 drx_config=lte_drx_config) 720 self._network.apply_changes() 721 722 def set_cell_frequency_band(self, tdd_cfg=None, ssf_cfg=None): 723 """Sets cell frequency band with tdd and ssf config. 724 725 Args: 726 tdd_cfg: the tdd subframe assignment config in number (from 0-6). 727 ssf_cfg: the special subframe pattern config in number (from 1-9). 728 """ 729 from rs_mrt.testenvironment.signaling.sri.rat.lte import SpecialSubframePattern 730 from rs_mrt.testenvironment.signaling.sri.rat.lte import SubFrameAssignment 731 from rs_mrt.testenvironment.signaling.sri.rat.lte.config import CellFrequencyBand 732 from rs_mrt.testenvironment.signaling.sri.rat.lte.config import Tdd 733 tdd_subframe = None 734 ssf_pattern = None 735 if tdd_cfg: 736 tdd_subframe = SubFrameAssignment(tdd_cfg + 1) 737 if ssf_cfg: 738 ssf_pattern = SpecialSubframePattern(ssf_cfg) 739 tdd = Tdd(tdd_config=Tdd.TddConfigSignaling( 740 subframe_assignment=tdd_subframe, 741 special_subframe_pattern=ssf_pattern)) 742 self._cell.stub.SetCellFrequencyBand(CellFrequencyBand(tdd=tdd)) 743 self._network.apply_changes() 744 745 def set_cfi(self, cfi): 746 """Sets number of pdcch symbols (cfi). 747 748 Args: 749 cfi: the value of NumberOfPdcchSymbols 750 """ 751 from rs_mrt.testenvironment.signaling.sri.rat.lte import NumberOfPdcchSymbols 752 from rs_mrt.testenvironment.signaling.sri.rat.lte.config import PdcchRegionReq 753 754 logger.info('The cfi enum to set is {}'.format( 755 NumberOfPdcchSymbols(cfi))) 756 req = PdcchRegionReq() 757 req.num_pdcch_symbols = NumberOfPdcchSymbols(cfi) 758 self._cell.stub.SetPdcchControlRegion(req) 759 760 def set_dci_format(self, dci_format): 761 """Selects the downlink control information (DCI) format. 762 763 Args: 764 dci_format: supported dci. 765 """ 766 if not isinstance(dci_format, DciFormat): 767 raise CmxError('Wrong type for dci_format') 768 self._config_scheduler(dl_dci_format=dci_format.value) 769 770 def set_dl_channel(self, channel): 771 """Sets the downlink channel number of cell. 772 773 Args: 774 channel: downlink channel number of cell. 775 """ 776 if self.dl_channel == channel: 777 logger.info('The dl_channel was at {}'.format(self.dl_channel)) 778 return 779 self._cell.set_earfcn(channel) 780 logger.info('The dl_channel was set to {}'.format(self.dl_channel)) 781 782 def set_dl_modulation_table(self, modulation): 783 """Sets down link modulation table. 784 785 Args: 786 modulation: modulation table setting (ModulationType). 787 """ 788 if not isinstance(modulation, ModulationType): 789 raise CmxError('The modulation is not the type of Modulation') 790 self._config_scheduler(dl_mcs_table=modulation.value) 791 792 def set_mimo_mode(self, mimo): 793 """Sets mimo mode for Lte scenario. 794 795 Args: 796 mimo: the mimo mode. 797 """ 798 if not isinstance(mimo, MimoModes): 799 raise CmxError("Wrong type of mimo mode") 800 801 self._cell.set_num_crs_antenna_ports(mimo.value) 802 self._config_scheduler(dl_num_layers=MIMO_MAX_LAYER_MAPPING[mimo]) 803 804 def set_scheduling_mode(self, 805 mcs_dl=None, 806 mcs_ul=None, 807 nrb_dl=None, 808 nrb_ul=None): 809 """Sets scheduling mode. 810 811 Args: 812 scheduling: the new scheduling mode. 813 mcs_dl: Downlink MCS. 814 mcs_ul: Uplink MCS. 815 nrb_dl: Number of RBs for downlink. 816 nrb_ul: Number of RBs for uplink. 817 """ 818 self._config_scheduler(dl_mcs=mcs_dl, 819 ul_mcs=mcs_ul, 820 dl_rb_alloc=nrb_dl, 821 ul_rb_alloc=nrb_ul) 822 823 def set_ssf_config(self, ssf_config): 824 """Sets ssf subframe assignment with tdd_config. 825 826 Args: 827 ssf_config: the special subframe pattern config (from 1-9). 828 """ 829 self.set_cell_frequency_band(ssf_cfg=ssf_config) 830 831 def set_tdd_config(self, tdd_config): 832 """Sets tdd subframe assignment with tdd_config. 833 834 Args: 835 tdd_config: the subframe assignemnt config (from 0-6). 836 """ 837 self.set_cell_frequency_band(tdd_cfg=tdd_config) 838 839 def set_transmission_mode(self, transmission_mode): 840 """Sets transmission mode with schedular. 841 842 Args: 843 transmission_mode: the download link transmission mode. 844 """ 845 from rs_mrt.testenvironment.signaling.sri.rat.lte import DlTransmissionMode 846 if not isinstance(transmission_mode, TransmissionModes): 847 raise CmxError('Wrong type of the trasmission mode') 848 dl_tm = DlTransmissionMode(transmission_mode.value) 849 logger.info('set dl tm to {}'.format(dl_tm)) 850 self._cc.set_dl_tm(dl_tm) 851 self._network.apply_changes() 852 853 def set_ul_channel(self, channel): 854 """Sets the up link channel number of cell. 855 856 Args: 857 channel: up link channel number of cell. 858 """ 859 if self.ul_channel == channel: 860 logger.info('The ul_channel is at {}'.format(self.ul_channel)) 861 return 862 self._cell.set_earfcn(channel) 863 logger.info('The dl_channel was set to {}'.format(self.ul_channel)) 864 865 @property 866 def ul_channel(self): 867 """Gets the uplink channel of cell. 868 869 Return: 870 the uplink channel (earfcn) in int 871 """ 872 return int(self._cell.get_ul_earfcn()) 873 874 @property 875 def ul_frequency(self): 876 """Get the uplink frequency of the cell. 877 878 Return: 879 The uplink frequency in GHz. 880 """ 881 from mrtype.frequency import Frequency 882 return self._cell.get_ul_earfcn().to_freq().in_units( 883 Frequency.Units.GHz) 884 885 def set_ul_modulation_table(self, modulation): 886 """Sets up link modulation table. 887 888 Args: 889 modulation: modulation table setting (ModulationType). 890 """ 891 if not isinstance(modulation, ModulationType): 892 raise CmxError('The modulation is not the type of Modulation') 893 if modulation == ModulationType.Q16: 894 self._cell.stub.SetPuschCommonConfig(False) 895 else: 896 self._cell.stub.SetPuschCommonConfig(True) 897 898 899class NrBaseStation(BaseStation): 900 """ NR base station.""" 901 def __init__(self, cmx, cell): 902 """Init method to setup variables for the NR base station. 903 904 Args: 905 cmx: Controller (Cmx500) object. 906 cell: The cell for the NR base station. 907 """ 908 from xlapi.nr_cell import NrCell 909 if not isinstance(cell, NrCell): 910 raise CmxError('the cell is not a NR cell, NR base station fails' 911 ' to creat.') 912 913 super().__init__(cmx, cell) 914 915 def _config_scheduler(self, 916 dl_mcs=None, 917 dl_mcs_table=None, 918 dl_rb_alloc=None, 919 dl_mimo_mode=None, 920 ul_mcs=None, 921 ul_mcs_table=None, 922 ul_rb_alloc=None, 923 ul_mimo_mode=None): 924 925 from rs_mrt.testenvironment.signaling.sri.rat.nr import McsTable 926 927 log_list = [] 928 if dl_mcs: 929 log_list.append('dl_mcs: {}'.format(dl_mcs)) 930 if ul_mcs: 931 log_list.append('ul_mcs: {}'.format(ul_mcs)) 932 933 # If rb alloc is not a tuple, add 0 as start RBs for XLAPI NR scheduler 934 if dl_rb_alloc: 935 if not isinstance(dl_rb_alloc, tuple): 936 dl_rb_alloc = (0, dl_rb_alloc) 937 log_list.append('dl_rb_alloc: {}'.format(dl_rb_alloc)) 938 if ul_rb_alloc: 939 if not isinstance(ul_rb_alloc, tuple): 940 ul_rb_alloc = (0, ul_rb_alloc) 941 log_list.append('ul_rb_alloc: {}'.format(ul_rb_alloc)) 942 if dl_mcs_table: 943 dl_mcs_table = McsTable(dl_mcs_table) 944 log_list.append('dl_mcs_table: {}'.format(dl_mcs_table)) 945 if ul_mcs_table: 946 ul_mcs_table = McsTable(ul_mcs_table) 947 log_list.append('ul_mcs_table: {}'.format(ul_mcs_table)) 948 if dl_mimo_mode: 949 log_list.append('dl_mimo_mode: {}'.format(dl_mimo_mode)) 950 if ul_mimo_mode: 951 log_list.append('ul_mimo_mode: {}'.format(ul_mimo_mode)) 952 953 scheduler = self._cmx.dut.get_scheduler(self._cell) 954 logger.info('configure scheduler for {}'.format(','.join(log_list))) 955 956 scheduler.configure_ue_scheduler(dl_mcs=dl_mcs, 957 dl_mcs_table=dl_mcs_table, 958 dl_rb_alloc=dl_rb_alloc, 959 dl_mimo_mode=dl_mimo_mode, 960 ul_mcs=ul_mcs, 961 ul_mcs_table=ul_mcs_table, 962 ul_rb_alloc=ul_rb_alloc, 963 ul_mimo_mode=ul_mimo_mode) 964 logger.info('Configure scheduler succeeds') 965 self._network.apply_changes() 966 967 def attach_as_secondary_cell(self, endc_timer=DEFAULT_ENDC_TIMER): 968 """Enable endc mode for NR cell. 969 970 Args: 971 endc_timer: timeout for endc state 972 """ 973 logger.info('enable endc mode for nsa dual connection') 974 self._cmx.dut.signaling.nsa_dual_connect(self._cell) 975 time_count = 0 976 while time_count < endc_timer: 977 if str(self._cmx.dut.state.radio_connectivity) == \ 978 'RadioConnectivityMode.EPS_LTE_NR': 979 logger.info('enter endc mode') 980 return 981 time.sleep(1) 982 time_count += 1 983 if time_count % 30 == 0: 984 logger.info('did not reach endc at {} s'.format(time_count)) 985 raise CmxError('Cannot reach endc after {} s'.format(endc_timer)) 986 987 def config_flexible_slots(self): 988 """Configs flexible slots for NR cell.""" 989 990 from rs_mrt.testenvironment.signaling.sri.rat.nr.config import CellFrequencyBandReq 991 from rs_mrt.testenvironment.signaling.sri.rat.common import SetupRelease 992 993 logger.info('Config flexible slots') 994 req = CellFrequencyBandReq() 995 req.band.frequency_range.fr1.tdd.tdd_config_common.setup_release = SetupRelease.RELEASE 996 self._cell.stub.SetCellFrequencyBand(req) 997 self._network.apply_changes() 998 999 def disable_all_ul_slots(self): 1000 """Disables all ul slots for NR cell""" 1001 self._cc.disable_all_ul_slots() 1002 1003 @property 1004 def dl_channel(self): 1005 """Gets the downlink channel of cell. 1006 1007 Return: 1008 the downlink channel (nr_arfcn) in int. 1009 """ 1010 return int(self._cell.get_dl_ref_a()) 1011 1012 def _bandwidth_to_carrier_bandwidth(self, bandwidth): 1013 """Converts bandwidth in MHz to CarrierBandwidth. 1014 CarrierBandwidth Enum in XLAPI: 1015 MHZ_5 = 0 1016 MHZ_10 = 1 1017 MHZ_15 = 2 1018 MHZ_20 = 3 1019 MHZ_25 = 4 1020 MHZ_30 = 5 1021 MHZ_40 = 6 1022 MHZ_50 = 7 1023 MHZ_60 = 8 1024 MHZ_70 = 9 1025 MHZ_80 = 10 1026 MHZ_90 = 11 1027 MHZ_100 = 12 1028 MHZ_200 = 13 1029 MHZ_400 = 14 1030 Args: 1031 bandwidth: channel bandwidth in MHz. 1032 1033 Return: 1034 the corresponding NR Carrier Bandwidth. 1035 """ 1036 from mrtype.nr.frequency import CarrierBandwidth 1037 if bandwidth > 100: 1038 return CarrierBandwidth(12 + bandwidth // 200) 1039 elif bandwidth > 30: 1040 return CarrierBandwidth(2 + bandwidth // 10) 1041 else: 1042 return CarrierBandwidth(bandwidth // 5 - 1) 1043 1044 def set_bandwidth(self, bandwidth, scs=None): 1045 """Sets the channel bandwidth of the cell. 1046 1047 Args: 1048 bandwidth: channel bandwidth of cell. 1049 scs: subcarrier spacing (SCS) of resource grid 0 1050 """ 1051 if not scs: 1052 scs = self._cell.get_scs() 1053 self._cell.set_carrier_bandwidth_and_scs( 1054 self._bandwidth_to_carrier_bandwidth(bandwidth), scs) 1055 logger.info( 1056 'The bandwidth in MHz is {}. After setting, the value is {}'. 1057 format(bandwidth, str(self._cell.get_carrier_bandwidth()))) 1058 1059 def set_dl_channel(self, channel): 1060 """Sets the downlink channel number of cell. 1061 1062 Args: 1063 channel: downlink channel number of cell or Frequency Range ('LOW', 1064 'MID' or 'HIGH'). 1065 """ 1066 from mrtype.nr.frequency import NrArfcn 1067 from mrtype.frequency import FrequencyRange 1068 1069 # When the channel is string, set it use Frenquency Range 1070 if isinstance(channel, str): 1071 logger.info('Sets dl channel Frequency Range {}'.format(channel)) 1072 frequency_range = FrequencyRange.LOW 1073 if channel.upper() == 'MID': 1074 frequency_range = FrequencyRange.MID 1075 elif channel.upper() == 'HIGH': 1076 frequency_range = FrequencyRange.HIGH 1077 self._cell.set_dl_ref_a_offset(self.band, frequency_range) 1078 logger.info('The dl_channel was set to {}'.format(self.dl_channel)) 1079 return 1080 if self.dl_channel == channel: 1081 logger.info('The dl_channel was at {}'.format(self.dl_channel)) 1082 return 1083 self._cell.set_dl_ref_a_offset(self.band, NrArfcn(channel)) 1084 logger.info('The dl_channel was set to {}'.format(self.dl_channel)) 1085 1086 def set_dl_modulation_table(self, modulation): 1087 """Sets down link modulation table. 1088 1089 Args: 1090 modulation: modulation table setting (ModulationType). 1091 """ 1092 if not isinstance(modulation, ModulationType): 1093 raise CmxError('The modulation is not the type of Modulation') 1094 self._config_scheduler(dl_mcs_table=modulation.value) 1095 1096 def set_mimo_mode(self, mimo): 1097 """Sets mimo mode for NR nsa scenario. 1098 1099 Args: 1100 mimo: the mimo mode. 1101 """ 1102 from rs_mrt.testenvironment.signaling.sri.rat.nr import DownlinkMimoMode 1103 if not isinstance(mimo, MimoModes): 1104 raise CmxError("Wrong type of mimo mode") 1105 1106 self._config_scheduler(dl_mimo_mode=DownlinkMimoMode.Enum(mimo.value)) 1107 1108 def set_scheduling_mode(self, 1109 mcs_dl=None, 1110 mcs_ul=None, 1111 nrb_dl=None, 1112 nrb_ul=None): 1113 """Sets scheduling mode. 1114 1115 Args: 1116 mcs_dl: Downlink MCS. 1117 mcs_ul: Uplink MCS. 1118 nrb_dl: Number of RBs for downlink. 1119 nrb_ul: Number of RBs for uplink. 1120 """ 1121 self._config_scheduler(dl_mcs=mcs_dl, 1122 ul_mcs=mcs_ul, 1123 dl_rb_alloc=nrb_dl, 1124 ul_rb_alloc=nrb_ul) 1125 1126 def set_ssf_config(self, ssf_config): 1127 """Sets ssf subframe assignment with tdd_config. 1128 1129 Args: 1130 ssf_config: the special subframe pattern config (from 1-9). 1131 """ 1132 raise CmxError('the set ssf config for nr did not implemente yet') 1133 1134 def set_tdd_config(self, tdd_config): 1135 """Sets tdd subframe assignment with tdd_config. 1136 1137 Args: 1138 tdd_config: the subframe assignemnt config (from 0-6). 1139 """ 1140 raise CmxError('the set tdd config for nr did not implemente yet') 1141 1142 def set_transmission_mode(self, transmission_mode): 1143 """Sets transmission mode with schedular. 1144 1145 Args: 1146 transmission_mode: the download link transmission mode. 1147 """ 1148 logger.info('The set transmission mode for nr is set by mimo mode') 1149 1150 def set_ul_modulation_table(self, modulation): 1151 """Sets down link modulation table. 1152 1153 Args: 1154 modulation: modulation table setting (ModulationType). 1155 """ 1156 if not isinstance(modulation, ModulationType): 1157 raise CmxError('The modulation is not the type of Modulation') 1158 self._config_scheduler(ul_mcs_table=modulation.value) 1159 1160 1161class CmxError(Exception): 1162 """Class to raise exceptions related to cmx.""" 1163