1#!/usr/bin/env python3 2# 3# Copyright 2021 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import time 18from enum import Enum 19 20from acts.controllers.rohdeschwarz_lib import cmx500 21from acts.controllers.rohdeschwarz_lib.cmx500 import LteBandwidth 22from acts.controllers.rohdeschwarz_lib.cmx500 import LteState 23from acts.controllers import cellular_simulator as cc 24from acts.controllers.cellular_lib import LteSimulation 25 26CMX_TM_MAPPING = { 27 LteSimulation.TransmissionMode.TM1: cmx500.TransmissionModes.TM1, 28 LteSimulation.TransmissionMode.TM2: cmx500.TransmissionModes.TM2, 29 LteSimulation.TransmissionMode.TM3: cmx500.TransmissionModes.TM3, 30 LteSimulation.TransmissionMode.TM4: cmx500.TransmissionModes.TM4, 31 LteSimulation.TransmissionMode.TM7: cmx500.TransmissionModes.TM7, 32 LteSimulation.TransmissionMode.TM8: cmx500.TransmissionModes.TM8, 33 LteSimulation.TransmissionMode.TM9: cmx500.TransmissionModes.TM9, 34} 35 36CMX_SCH_MAPPING = { 37 LteSimulation.SchedulingMode.STATIC: cmx500.SchedulingMode.USERDEFINEDCH 38} 39 40CMX_MIMO_MAPPING = { 41 LteSimulation.MimoMode.MIMO_1x1: cmx500.MimoModes.MIMO1x1, 42 LteSimulation.MimoMode.MIMO_2x2: cmx500.MimoModes.MIMO2x2, 43 LteSimulation.MimoMode.MIMO_4x4: cmx500.MimoModes.MIMO4x4, 44} 45 46 47class ConfigurationMode(Enum): 48 Power = "Power" 49 50 51class CMX500CellularSimulator(cc.AbstractCellularSimulator): 52 """ A cellular simulator for telephony simulations based on the CMX 500 53 controller. """ 54 55 # The maximum power that the equipment is able to transmit 56 MAX_DL_POWER = -25 57 58 def __init__(self, 59 ip_address, 60 port='5025', 61 config_mode=ConfigurationMode.Power): 62 """ Initializes the cellular simulator. 63 64 Args: 65 ip_address: the ip address of the CMX500 66 port: the port number for the CMX500 controller 67 config_mode: A pre-defined configuration mode to use. 68 """ 69 super().__init__() 70 try: 71 self.cmx = cmx500.Cmx500(ip_address, port) 72 except: 73 raise cc.CellularSimulatorError('Error when Initializes CMX500.') 74 75 self._config_mode = config_mode 76 self.bts = self.cmx.bts 77 78 def destroy(self): 79 """ Sends finalization commands to the cellular equipment and closes 80 the connection. """ 81 self.log.info('destroy the cmx500 simulator') 82 self.cmx.disconnect() 83 84 def setup_lte_scenario(self): 85 """ Configures the equipment for an LTE simulation. """ 86 self.log.info('setup lte scenario') 87 self.cmx.switch_lte_signalling(cmx500.LteState.LTE_ON) 88 89 def setup_nr_sa_scenario(self): 90 """ Configures the equipment for an NR stand alone simulation. """ 91 raise NotImplementedError() 92 93 def setup_nr_nsa_scenario(self): 94 """ Configures the equipment for an NR non stand alone simulation. """ 95 self.log.info('setup nsa scenario (start lte cell and nr cell') 96 self.cmx.switch_on_nsa_signalling() 97 98 def set_band_combination(self, bands, mimo_modes): 99 """ Prepares the test equipment for the indicated band/mimo combination. 100 101 Args: 102 bands: a list of bands represented as ints or strings 103 mimo_modes: a list of LteSimulation.MimoMode to use for each carrier 104 """ 105 self.num_carriers = len(bands) 106 107 def set_lte_rrc_state_change_timer(self, enabled, time=10): 108 """ Configures the LTE RRC state change timer. 109 110 Args: 111 enabled: a boolean indicating if the timer should be on or off. 112 time: time in seconds for the timer to expire 113 """ 114 self.log.info('set timer enabled to {} and the time to {}'.format( 115 enabled, time)) 116 self.cmx.rrc_state_change_time_enable = enabled 117 self.cmx.lte_rrc_state_change_timer = time 118 119 def set_band(self, bts_index, band): 120 """ Sets the band for the indicated base station. 121 122 Args: 123 bts_index: the base station number 124 band: the new band 125 """ 126 self.log.info('set band to {}'.format(band)) 127 self.bts[bts_index].set_band(int(band)) 128 129 def get_duplex_mode(self, band): 130 """ Determines if the band uses FDD or TDD duplex mode 131 132 Args: 133 band: a band number 134 135 Returns: 136 an variable of class DuplexMode indicating if band is FDD or TDD 137 """ 138 if 33 <= int(band) <= 46: 139 return cmx500.DuplexMode.TDD 140 else: 141 return cmx500.DuplexMode.FDD 142 143 def set_input_power(self, bts_index, input_power): 144 """ Sets the input power for the indicated base station. 145 146 Args: 147 bts_index: the base station number 148 input_power: the new input power 149 """ 150 if input_power > 23: 151 self.log.warning('Open loop supports -50dBm to 23 dBm. ' 152 'Setting it to max power 23 dBm') 153 input_power = 23 154 self.log.info('set input power to {}'.format(input_power)) 155 self.bts[bts_index].set_ul_power(input_power) 156 157 def set_output_power(self, bts_index, output_power): 158 """ Sets the output power for the indicated base station. 159 160 Args: 161 bts_index: the base station number 162 output_power: the new output power 163 """ 164 self.log.info('set output power to {}'.format(output_power)) 165 self.bts[bts_index].set_dl_power(output_power) 166 167 def set_tdd_config(self, bts_index, tdd_config): 168 """ Sets the tdd configuration number for the indicated base station. 169 170 Args: 171 bts_index: the base station number 172 tdd_config: the new tdd configuration number (from 0 to 6) 173 """ 174 self.log.info('set tdd config to {}'.format(tdd_config)) 175 self.bts[bts_index].set_tdd_config(tdd_config) 176 177 def set_ssf_config(self, bts_index, ssf_config): 178 """ Sets the Special Sub-Frame config number for the indicated 179 base station. 180 181 Args: 182 bts_index: the base station number 183 ssf_config: the new ssf config number (from 0 to 9) 184 """ 185 self.log.info('set ssf config to {}'.format(ssf_config)) 186 self.bts[bts_index].set_ssf_config(ssf_config) 187 188 def set_bandwidth(self, bts_index, bandwidth): 189 """ Sets the bandwidth for the indicated base station. 190 191 Args: 192 bts_index: the base station number 193 bandwidth: the new bandwidth in MHz 194 """ 195 self.log.info('set bandwidth of bts {} to {}'.format( 196 bts_index, bandwidth)) 197 self.bts[bts_index].set_bandwidth(int(bandwidth)) 198 199 def set_downlink_channel_number(self, bts_index, channel_number): 200 """ Sets the downlink channel number for the indicated base station. 201 202 Args: 203 bts_index: the base station number 204 channel_number: the new channel number (earfcn) 205 """ 206 self.log.info( 207 'Sets the downlink channel number to {}'.format(channel_number)) 208 self.bts[bts_index].set_dl_channel(channel_number) 209 210 def set_mimo_mode(self, bts_index, mimo_mode): 211 """ Sets the mimo mode for the indicated base station. 212 213 Args: 214 bts_index: the base station number 215 mimo_mode: the new mimo mode 216 """ 217 self.log.info('set mimo mode to {}'.format(mimo_mode)) 218 mimo_mode = CMX_MIMO_MAPPING[mimo_mode] 219 self.bts[bts_index].set_mimo_mode(mimo_mode) 220 221 def set_transmission_mode(self, bts_index, tmode): 222 """ Sets the transmission mode for the indicated base station. 223 224 Args: 225 bts_index: the base station number 226 tmode: the new transmission mode 227 """ 228 self.log.info('set TransmissionMode to {}'.format(tmode)) 229 tmode = CMX_TM_MAPPING[tmode] 230 self.bts[bts_index].set_transmission_mode(tmode) 231 232 def set_scheduling_mode(self, 233 bts_index, 234 scheduling, 235 mcs_dl=None, 236 mcs_ul=None, 237 nrb_dl=None, 238 nrb_ul=None): 239 """ Sets the scheduling mode for the indicated base station. 240 241 Args: 242 bts_index: the base station number. 243 scheduling: the new scheduling mode. 244 mcs_dl: Downlink MCS. 245 mcs_ul: Uplink MCS. 246 nrb_dl: Number of RBs for downlink. 247 nrb_ul: Number of RBs for uplink. 248 """ 249 if scheduling not in CMX_SCH_MAPPING: 250 raise cc.CellularSimulatorError( 251 "This scheduling mode is not supported") 252 log_list = [] 253 if mcs_dl: 254 log_list.append('mcs_dl: {}'.format(mcs_dl)) 255 if mcs_ul: 256 log_list.append('mcs_ul: {}'.format(mcs_ul)) 257 if nrb_dl: 258 log_list.append('nrb_dl: {}'.format(nrb_dl)) 259 if nrb_ul: 260 log_list.append('nrb_ul: {}'.format(nrb_ul)) 261 262 self.log.info('set scheduling mode to {}'.format(','.join(log_list))) 263 self.bts[bts_index].set_scheduling_mode(mcs_dl=mcs_dl, 264 mcs_ul=mcs_ul, 265 nrb_dl=nrb_dl, 266 nrb_ul=nrb_ul) 267 268 def set_dl_256_qam_enabled(self, bts_index, enabled): 269 """ Determines what MCS table should be used for the downlink. 270 271 Args: 272 bts_index: the base station number 273 enabled: whether 256 QAM should be used 274 """ 275 self.log.info('Set 256 QAM DL MCS enabled: ' + str(enabled)) 276 self.bts[bts_index].set_dl_modulation_table( 277 cmx500.ModulationType.Q256 if enabled else cmx500.ModulationType. 278 Q64) 279 280 def set_ul_64_qam_enabled(self, bts_index, enabled): 281 """ Determines what MCS table should be used for the uplink. 282 283 Args: 284 bts_index: the base station number 285 enabled: whether 64 QAM should be used 286 """ 287 self.log.info('Set 64 QAM UL MCS enabled: ' + str(enabled)) 288 self.bts[bts_index].set_ul_modulation_table( 289 cmx500.ModulationType.Q64 if enabled else cmx500.ModulationType.Q16 290 ) 291 292 def set_mac_padding(self, bts_index, mac_padding): 293 """ Enables or disables MAC padding in the indicated base station. 294 295 Args: 296 bts_index: the base station number 297 mac_padding: the new MAC padding setting 298 """ 299 self.log.info('set mac pad on {}'.format(mac_padding)) 300 self.bts[bts_index].set_dl_mac_padding(mac_padding) 301 302 def set_cfi(self, bts_index, cfi): 303 """ Sets the Channel Format Indicator for the indicated base station. 304 305 Args: 306 bts_index: the base station number 307 cfi: the new CFI setting 308 """ 309 if cfi == 'BESTEFFORT': 310 self.log.info('The cfi is BESTEFFORT, use default value') 311 return 312 try: 313 index = int(cfi) + 1 314 except Exception as e: 315 index = 1 316 finally: 317 self.log.info('set the cfi and the cfi index is {}'.format(index)) 318 self.bts[bts_index].set_cfi(index) 319 320 def set_paging_cycle(self, bts_index, cycle_duration): 321 """ Sets the paging cycle duration for the indicated base station. 322 323 Args: 324 bts_index: the base station number 325 cycle_duration: the new paging cycle duration in milliseconds 326 """ 327 self.log.warning('The set_paging_cycle method is not implememted, ' 328 'use default value') 329 330 def set_phich_resource(self, bts_index, phich): 331 """ Sets the PHICH Resource setting for the indicated base station. 332 333 Args: 334 bts_index: the base station number 335 phich: the new PHICH resource setting 336 """ 337 self.log.warning('The set_phich_resource method is not implememted, ' 338 'use default value') 339 340 def lte_attach_secondary_carriers(self, ue_capability_enquiry): 341 """ Activates the secondary carriers for CA. Requires the DUT to be 342 attached to the primary carrier first. 343 344 Args: 345 ue_capability_enquiry: UE capability enquiry message to be sent to 346 the UE before starting carrier aggregation. 347 """ 348 self.wait_until_communication_state() 349 self.bts[1].attach_as_secondary_cell() 350 time.sleep(10) 351 352 if self._config_mode and self._config_mode == ConfigurationMode.Power: 353 self.configure_for_power_measurement() 354 355 self.log.info('The radio connectivity is {}'.format( 356 self.cmx.dut.state.radio_connectivity)) 357 358 def configure_for_power_measurement(self): 359 """ Applies a pre-defined configuration for PDCCH power testing.""" 360 self.log.info('set lte cdrx for nr nsa scenario') 361 self.bts[0].set_cdrx_config() 362 time.sleep(5) 363 364 self.log.info('Disables mac padding') 365 self.bts[0].set_dl_mac_padding(False) 366 self.bts[1].set_dl_mac_padding(False) 367 time.sleep(5) 368 369 self.log.info('configure flexible slots and wait for 5 seconds') 370 self.bts[1].config_flexible_slots() 371 time.sleep(5) 372 373 self.log.info('disable all ul subframes of the lte cell') 374 self.bts[0].disable_all_ul_subframes() 375 time.sleep(30) 376 377 self.log.info('Disables Nr UL slots') 378 self.bts[1].disable_all_ul_slots() 379 time.sleep(5) 380 381 def wait_until_attached(self, timeout=120): 382 """ Waits until the DUT is attached to the primary carrier. 383 384 Args: 385 timeout: after this amount of time the method will raise a 386 CellularSimulatorError exception. Default is 120 seconds. 387 """ 388 self.log.info('wait until attached') 389 self.cmx.wait_until_attached(timeout) 390 391 def wait_until_communication_state(self, timeout=120): 392 """ Waits until the DUT is in Communication state. 393 394 Args: 395 timeout: after this amount of time the method will raise a 396 CellularSimulatorError exception. Default is 120 seconds. 397 Return: 398 True if cmx reach rrc state within timeout 399 Raise: 400 CmxError if tiemout 401 """ 402 self.log.info('wait for rrc on state') 403 return self.cmx.wait_for_rrc_state(cmx500.RrcState.RRC_ON, timeout) 404 405 def wait_until_idle_state(self, timeout=120): 406 """ Waits until the DUT is in Idle state. 407 408 Args: 409 timeout: after this amount of time the method will raise a 410 CellularSimulatorError exception. Default is 120 seconds. 411 Return: 412 True if cmx reach rrc state within timeout 413 Raise: 414 CmxError if tiemout 415 """ 416 self.log.info('wait for rrc off state') 417 return self.cmx.wait_for_rrc_state(cmx500.RrcState.RRC_OFF, timeout) 418 419 def wait_until_quiet(self, timeout=120): 420 """Waits for all pending operations to finish on the simulator. 421 422 Args: 423 timeout: after this amount of time the method will raise a 424 CellularSimulatorError exception. Default is 120 seconds. 425 """ 426 self.cmx._network.apply_changes() 427 428 def detach(self): 429 """ Turns off all the base stations so the DUT loose connection.""" 430 self.log.info('Bypass simulator detach step for now') 431 432 def stop(self): 433 """ Stops current simulation. After calling this method, the simulator 434 will need to be set up again. """ 435 self.log.info('Stops current simulation and disconnect cmx500') 436 self.cmx.disconnect() 437 438 def start_data_traffic(self): 439 """ Starts transmitting data from the instrument to the DUT. """ 440 self.log.warning('The start_data_traffic is not implemented yet') 441 442 def stop_data_traffic(self): 443 """ Stops transmitting data from the instrument to the DUT. """ 444 self.log.warning('The stop_data_traffic is not implemented yet') 445 446 def send_sms(self, message): 447 """ Sends an SMS message to the DUT. 448 449 Args: 450 message: the SMS message to send. 451 """ 452 self.cmx.send_sms(message) 453