1# Copyright 2021 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import time 6import logging 7 8from autotest_lib.server.cros.cellular import cellular_simulator as cc 9from autotest_lib.server.cros.cellular.callbox_utils import cmw500 10from autotest_lib.server.cros.cellular.simulation_utils import LteSimulation 11 12CMW_TM_MAPPING = { 13 LteSimulation.TransmissionMode.TM1: cmw500.TransmissionModes.TM1, 14 LteSimulation.TransmissionMode.TM2: cmw500.TransmissionModes.TM2, 15 LteSimulation.TransmissionMode.TM3: cmw500.TransmissionModes.TM3, 16 LteSimulation.TransmissionMode.TM4: cmw500.TransmissionModes.TM4, 17 LteSimulation.TransmissionMode.TM7: cmw500.TransmissionModes.TM7, 18 LteSimulation.TransmissionMode.TM8: cmw500.TransmissionModes.TM8, 19 LteSimulation.TransmissionMode.TM9: cmw500.TransmissionModes.TM9 20} 21 22CMW_SCH_MAPPING = { 23 LteSimulation.SchedulingMode.STATIC: 24 cmw500.SchedulingMode.USERDEFINEDCH 25} 26 27CMW_MIMO_MAPPING = { 28 LteSimulation.MimoMode.MIMO_1x1: cmw500.MimoModes.MIMO1x1, 29 LteSimulation.MimoMode.MIMO_2x2: cmw500.MimoModes.MIMO2x2, 30 LteSimulation.MimoMode.MIMO_4x4: cmw500.MimoModes.MIMO4x4 31} 32 33CMW_MODULATION_MAPPING = { 34 LteSimulation.ModulationType.QPSK: cmw500.ModulationType.QPSK, 35 LteSimulation.ModulationType.Q16: cmw500.ModulationType.Q16, 36 LteSimulation.ModulationType.Q64: cmw500.ModulationType.Q64, 37 LteSimulation.ModulationType.Q256: cmw500.ModulationType.Q256 38} 39 40# get mcs vs tbsi map with 256-qam disabled(downlink) 41get_mcs_tbsi_map_dl = { 42 cmw500.ModulationType.QPSK: { 43 0: 0, 44 1: 1, 45 2: 2, 46 3: 3, 47 4: 4, 48 5: 5, 49 6: 6, 50 7: 7, 51 8: 8, 52 9: 9 53 }, 54 cmw500.ModulationType.Q16: { 55 10: 9, 56 11: 10, 57 12: 11, 58 13: 12, 59 14: 13, 60 15: 14, 61 16: 15 62 }, 63 cmw500.ModulationType.Q64: { 64 17: 15, 65 18: 16, 66 19: 17, 67 20: 18, 68 21: 19, 69 22: 20, 70 23: 21, 71 24: 22, 72 25: 23, 73 26: 24, 74 27: 25, 75 28: 26 76 } 77} 78 79# get mcs vs tbsi map with 256-qam enabled(downlink) 80get_mcs_tbsi_map_for_256qam_dl = { 81 cmw500.ModulationType.QPSK: { 82 0: 0, 83 1: 2, 84 2: 4, 85 3: 6, 86 4: 8, 87 }, 88 cmw500.ModulationType.Q16: { 89 5: 10, 90 6: 11, 91 7: 12, 92 8: 13, 93 9: 14, 94 10: 15 95 }, 96 cmw500.ModulationType.Q64: { 97 11: 16, 98 12: 17, 99 13: 18, 100 14: 19, 101 15: 20, 102 16: 21, 103 17: 22, 104 18: 23, 105 19: 24 106 }, 107 cmw500.ModulationType.Q256: { 108 20: 25, 109 21: 27, 110 22: 28, 111 23: 29, 112 24: 30, 113 25: 31, 114 26: 32, 115 27: 33 116 } 117} 118 119# get mcs vs tbsi map (uplink) 120get_mcs_tbsi_map_ul = { 121 cmw500.ModulationType.QPSK: { 122 0: 0, 123 1: 1, 124 2: 2, 125 3: 3, 126 4: 4, 127 5: 5, 128 6: 6, 129 7: 7, 130 8: 8, 131 9: 9 132 }, 133 cmw500.ModulationType.Q16: { 134 10: 10, 135 11: 10, 136 12: 11, 137 13: 12, 138 14: 13, 139 15: 14, 140 16: 15, 141 17: 16, 142 18: 17, 143 19: 18, 144 20: 19, 145 21: 19, 146 22: 20, 147 23: 21, 148 24: 22, 149 25: 23, 150 26: 24, 151 27: 25, 152 28: 26 153 } 154} 155 156 157class CMW500CellularSimulator(cc.AbstractCellularSimulator): 158 """ A cellular simulator for telephony simulations based on the CMW 500 159 controller. """ 160 161 # Indicates if it is able to use 256 QAM as the downlink modulation for LTE 162 LTE_SUPPORTS_DL_256QAM = True 163 164 # Indicates if it is able to use 64 QAM as the uplink modulation for LTE 165 LTE_SUPPORTS_UL_64QAM = True 166 167 # Indicates if 4x4 MIMO is supported for LTE 168 LTE_SUPPORTS_4X4_MIMO = True 169 170 # The maximum number of carriers that this simulator can support for LTE 171 LTE_MAX_CARRIERS = 1 172 173 def __init__(self, ip_address, port): 174 """ Initializes the cellular simulator. 175 176 Args: 177 ip_address: the ip address of the CMW500 178 port: the port number for the CMW500 controller 179 """ 180 181 try: 182 self.cmw = cmw500.Cmw500(ip_address, port) 183 except cmw500.CmwError: 184 raise cc.CellularSimulatorError('Could not connect to CMW500.') 185 186 self.bts = None 187 self.log = logging.getLogger(__name__) 188 self.dl_modulation = None 189 self.ul_modulation = None 190 191 def destroy(self): 192 """ Sends finalization commands to the cellular equipment and closes 193 the connection. """ 194 self.cmw.disconnect() 195 196 def setup_lte_scenario(self): 197 """ Configures the equipment for an LTE simulation. """ 198 self.cmw.connection_type = cmw500.ConnectionType.DAU 199 self.bts = [self.cmw.get_base_station()] 200 self.cmw.switch_lte_signalling(cmw500.LteState.LTE_ON) 201 202 def setup_lte_ca_scenario(self): 203 """ Configures the equipment for an LTE with CA simulation. """ 204 raise NotImplementedError() 205 206 def set_lte_rrc_state_change_timer(self, enabled, time=10): 207 """ Configures the LTE RRC state change timer. 208 209 Args: 210 enabled: a boolean indicating if the timer should be on or off. 211 time: time in seconds for the timer to expire 212 """ 213 if enabled: 214 self.cmw.rrc_connection = cmw500.RrcState.RRC_OFF 215 self.cmw.rrc_connection_timer = time 216 else: 217 self.cmw.rrc_connection = cmw500.RrcState.RRC_ON 218 219 def set_band(self, bts_index, band): 220 """ Sets the band for the indicated base station. 221 222 Args: 223 bts_index: the base station number 224 band: the new band 225 """ 226 bts = self.bts[bts_index] 227 bts.duplex_mode = self.get_duplex_mode(band) 228 band = 'OB' + band 229 bts.band = band 230 self.log.debug('Band set to {}'.format(band)) 231 232 def get_duplex_mode(self, band): 233 """ Determines if the band uses FDD or TDD duplex mode 234 235 Args: 236 band: a band number 237 238 Returns: 239 an variable of class DuplexMode indicating if band is FDD or TDD 240 """ 241 if 33 <= int(band) <= 46: 242 return cmw500.DuplexMode.TDD 243 else: 244 return cmw500.DuplexMode.FDD 245 246 def set_input_power(self, bts_index, input_power): 247 """ Sets the input power for the indicated base station. 248 249 Args: 250 bts_index: the base station number 251 input_power: the new input power 252 """ 253 bts = self.bts[bts_index] 254 if input_power > 23: 255 self.log.warning('Open loop supports-50dBm to 23 dBm. ' 256 'Setting it to max power 23 dBm') 257 input_power = 23 258 bts.uplink_power_control = input_power 259 bts.tpc_power_control = cmw500.TpcPowerControl.CLOSED_LOOP 260 bts.tpc_closed_loop_target_power = input_power 261 262 def set_output_power(self, bts_index, output_power): 263 """ Sets the output power for the indicated base station. 264 265 Args: 266 bts_index: the base station number 267 output_power: the new output power 268 """ 269 bts = self.bts[bts_index] 270 bts.downlink_power_level = output_power 271 272 def set_tdd_config(self, bts_index, tdd_config): 273 """ Sets the tdd configuration number for the indicated base station. 274 275 Args: 276 bts_index: the base station number 277 tdd_config: the new tdd configuration number 278 """ 279 self.bts[bts_index].uldl_configuration = tdd_config 280 281 def set_ssf_config(self, bts_index, ssf_config): 282 """ Sets the Special Sub-Frame config number for the indicated 283 base station. 284 285 Args: 286 bts_index: the base station number 287 ssf_config: the new ssf config number 288 """ 289 if not 0 <= ssf_config <= 9: 290 raise ValueError('The Special Sub-Frame configuration has to be a ' 291 'number between 0 and 9.') 292 293 self.bts[bts_index].tdd_special_subframe = ssf_config 294 295 def set_bandwidth(self, bts_index, bandwidth): 296 """ Sets the bandwidth for the indicated base station. 297 298 Args: 299 bts_index: the base station number 300 bandwidth: the new bandwidth 301 """ 302 bts = self.bts[bts_index] 303 304 if bandwidth == 20: 305 bts.bandwidth = cmw500.LteBandwidth.BANDWIDTH_20MHz 306 elif bandwidth == 15: 307 bts.bandwidth = cmw500.LteBandwidth.BANDWIDTH_15MHz 308 elif bandwidth == 10: 309 bts.bandwidth = cmw500.LteBandwidth.BANDWIDTH_10MHz 310 elif bandwidth == 5: 311 bts.bandwidth = cmw500.LteBandwidth.BANDWIDTH_5MHz 312 elif bandwidth == 3: 313 bts.bandwidth = cmw500.LteBandwidth.BANDWIDTH_3MHz 314 elif bandwidth == 1.4: 315 bts.bandwidth = cmw500.LteBandwidth.BANDWIDTH_1MHz 316 else: 317 msg = 'Bandwidth {} MHz is not valid for LTE'.format(bandwidth) 318 raise ValueError(msg) 319 320 def set_downlink_channel_number(self, bts_index, channel_number): 321 """ Sets the downlink channel number for the indicated base station. 322 323 Args: 324 bts_index: the base station number 325 channel_number: the new channel number 326 """ 327 bts = self.bts[bts_index] 328 bts.dl_channel = channel_number 329 self.log.debug('Downlink Channel set to {}'.format(bts.dl_channel)) 330 331 def set_mimo_mode(self, bts_index, mimo_mode): 332 """ Sets the mimo mode for the indicated base station. 333 334 Args: 335 bts_index: the base station number 336 mimo_mode: the new mimo mode 337 """ 338 bts = self.bts[bts_index] 339 mimo_mode = CMW_MIMO_MAPPING[mimo_mode] 340 if mimo_mode == cmw500.MimoModes.MIMO1x1: 341 self.cmw.configure_mimo_settings(cmw500.MimoScenario.SCEN1x1) 342 bts.dl_antenna = cmw500.MimoModes.MIMO1x1 343 344 elif mimo_mode == cmw500.MimoModes.MIMO2x2: 345 self.cmw.configure_mimo_settings(cmw500.MimoScenario.SCEN2x2) 346 bts.dl_antenna = cmw500.MimoModes.MIMO2x2 347 348 elif mimo_mode == cmw500.MimoModes.MIMO4x4: 349 self.cmw.configure_mimo_settings(cmw500.MimoScenario.SCEN4x4) 350 bts.dl_antenna = cmw500.MimoModes.MIMO4x4 351 else: 352 raise RuntimeError('The requested MIMO mode is not supported.') 353 354 def set_transmission_mode(self, bts_index, tmode): 355 """ Sets the transmission mode for the indicated base station. 356 357 Args: 358 bts_index: the base station number 359 tmode: the new transmission mode 360 """ 361 bts = self.bts[bts_index] 362 363 tmode = CMW_TM_MAPPING[tmode] 364 365 if (tmode in [ 366 cmw500.TransmissionModes.TM1, cmw500.TransmissionModes.TM7 367 ] and bts.dl_antenna == cmw500.MimoModes.MIMO1x1.value): 368 bts.transmode = tmode 369 elif (tmode.value in cmw500.TransmissionModes.__members__ 370 and bts.dl_antenna == cmw500.MimoModes.MIMO2x2.value): 371 bts.transmode = tmode 372 elif (tmode in [ 373 cmw500.TransmissionModes.TM2, cmw500.TransmissionModes.TM3, 374 cmw500.TransmissionModes.TM4, cmw500.TransmissionModes.TM9 375 ] and bts.dl_antenna == cmw500.MimoModes.MIMO4x4.value): 376 bts.transmode = tmode 377 378 else: 379 raise ValueError('Transmission modes should support the current ' 380 'mimo mode') 381 382 def set_scheduling_mode(self, 383 bts_index, 384 scheduling, 385 mcs_dl=None, 386 mcs_ul=None, 387 nrb_dl=None, 388 nrb_ul=None): 389 """ Sets the scheduling mode for the indicated base station. 390 391 Args: 392 bts_index: the base station number. 393 scheduling: the new scheduling mode. 394 mcs_dl: Downlink MCS. 395 mcs_ul: Uplink MCS. 396 nrb_dl: Number of RBs for downlink. 397 nrb_ul: Number of RBs for uplink. 398 """ 399 bts = self.bts[bts_index] 400 bts.reduced_pdcch = cmw500.ReducedPdcch.ON 401 402 scheduling = CMW_SCH_MAPPING[scheduling] 403 bts.scheduling_mode = scheduling 404 405 if not (self.ul_modulation and self.dl_modulation): 406 raise ValueError('Modulation should be set prior to scheduling ' 407 'call') 408 409 if scheduling == cmw500.SchedulingMode.RMC: 410 411 if not nrb_ul and nrb_dl: 412 raise ValueError('nrb_ul and nrb dl should not be none') 413 414 bts.rb_configuration_ul = (nrb_ul, self.ul_modulation, 'KEEP') 415 self.log.info('ul rb configurations set to {}'.format( 416 bts.rb_configuration_ul)) 417 418 time.sleep(1) 419 420 self.log.debug('Setting rb configurations for down link') 421 bts.rb_configuration_dl = (nrb_dl, self.dl_modulation, 'KEEP') 422 self.log.info('dl rb configurations set to {}'.format( 423 bts.rb_configuration_ul)) 424 425 elif scheduling == cmw500.SchedulingMode.USERDEFINEDCH: 426 427 if not all([nrb_ul, nrb_dl, mcs_dl, mcs_ul]): 428 raise ValueError('All parameters are mandatory.') 429 430 tbs = get_mcs_tbsi_map_ul[self.ul_modulation][mcs_ul] 431 432 bts.rb_configuration_ul = (nrb_ul, 0, self.ul_modulation, tbs) 433 self.log.info('ul rb configurations set to {}'.format( 434 bts.rb_configuration_ul)) 435 436 time.sleep(1) 437 438 if self.dl_modulation == cmw500.ModulationType.Q256: 439 tbs = get_mcs_tbsi_map_for_256qam_dl[ 440 self.dl_modulation][mcs_dl] 441 else: 442 tbs = get_mcs_tbsi_map_dl[self.dl_modulation][mcs_dl] 443 444 bts.rb_configuration_dl = (nrb_dl, 0, self.dl_modulation, tbs) 445 self.log.info('dl rb configurations set to {}'.format( 446 bts.rb_configuration_dl)) 447 448 def set_dl_modulation(self, bts_index, modulation): 449 """ Sets the DL modulation for the indicated base station. 450 451 This function does not actually configure the test equipment with this 452 setting, but stores the value to be used later on when setting the 453 scheduling type. This is because the CMW500 API only allows to set 454 this parameters together. 455 456 Args: 457 bts_index: the base station number 458 modulation: the new DL modulation 459 """ 460 # Convert dl modulation type to CMW modulation type. 461 self.dl_modulation = CMW_MODULATION_MAPPING[modulation] 462 463 self.log.warning('Modulation config stored but not applied until ' 464 'set_scheduling_mode called.') 465 466 def set_ul_modulation(self, bts_index, modulation): 467 """ Sets the UL modulation for the indicated base station. 468 469 This function does not actually configure the test equipment with this 470 setting, but stores the value to be used later on when setting the 471 scheduling type. This is because the CMW500 API only allows to set 472 this parameters together. 473 474 Args: 475 bts_index: the base station number 476 modulation: the new UL modulation 477 """ 478 479 # Convert ul modulation type to CMW modulation type. 480 self.ul_modulation = CMW_MODULATION_MAPPING[modulation] 481 482 self.log.warning('Modulation config stored but not applied until ' 483 'set_scheduling_mode called.') 484 485 def set_tbs_pattern_on(self, bts_index, tbs_pattern_on): 486 """ Enables or disables TBS pattern in the indicated base station. 487 488 Args: 489 bts_index: the base station number 490 tbs_pattern_on: the new TBS pattern setting 491 """ 492 # TODO (b/143918664): CMW500 doesn't have an equivalent setting. 493 pass 494 495 def set_cfi(self, bts_index, cfi): 496 """ Sets the Channel Format Indicator for the indicated base station. 497 498 Args: 499 bts_index: the base station number 500 cfi: the new CFI setting 501 """ 502 # TODO (b/143497738): implement. 503 self.log.error('Setting CFI is not yet implemented in the CMW500 ' 504 'controller.') 505 506 def set_paging_cycle(self, bts_index, cycle_duration): 507 """ Sets the paging cycle duration for the indicated base station. 508 509 Args: 510 bts_index: the base station number 511 cycle_duration: the new paging cycle duration in milliseconds 512 """ 513 # TODO (b/146068532): implement. 514 self.log.error('Setting the paging cycle duration is not yet ' 515 'implemented in the CMW500 controller.') 516 517 def set_phich_resource(self, bts_index, phich): 518 """ Sets the PHICH Resource setting for the indicated base station. 519 520 Args: 521 bts_index: the base station number 522 phich: the new PHICH resource setting 523 """ 524 self.log.error('Configuring the PHICH resource setting is not yet ' 525 'implemented in the CMW500 controller.') 526 527 def lte_attach_secondary_carriers(self, ue_capability_enquiry): 528 """ Activates the secondary carriers for CA. Requires the DUT to be 529 attached to the primary carrier first. 530 531 Args: 532 ue_capability_enquiry: UE capability enquiry message to be sent to 533 the UE before starting carrier aggregation. 534 """ 535 raise NotImplementedError() 536 537 def wait_until_attached(self, timeout=120): 538 """ Waits until the DUT is attached to the primary carrier. 539 540 Args: 541 timeout: after this amount of time the method will raise a 542 CellularSimulatorError exception. Default is 120 seconds. 543 """ 544 try: 545 self.cmw.wait_for_attached_state(timeout=timeout) 546 except cmw500.CmwError: 547 raise cc.CellularSimulatorError('The phone was not in ' 548 'attached state before ' 549 'the timeout period ended.') 550 551 def wait_until_communication_state(self, timeout=120): 552 """ Waits until the DUT is in Communication state. 553 554 Args: 555 timeout: after this amount of time the method will raise a 556 CellularSimulatorError exception. Default is 120 seconds. 557 """ 558 try: 559 self.cmw.wait_for_rrc_state(cmw500.LTE_CONN_RESP, timeout=timeout) 560 except cmw500.CmwError: 561 raise cc.CellularSimulatorError('The phone was not in ' 562 'Communication state before ' 563 'the timeout period ended.') 564 565 def wait_until_idle_state(self, timeout=120): 566 """ Waits until the DUT is in Idle state. 567 568 Args: 569 timeout: after this amount of time the method will raise a 570 CellularSimulatorError exception. Default is 120 seconds. 571 """ 572 try: 573 self.cmw.wait_for_rrc_state(cmw500.LTE_IDLE_RESP, timeout=timeout) 574 except cmw500.CmwError: 575 raise cc.CellularSimulatorError('The phone was not in ' 576 'Idle state before ' 577 'the timeout period ended.') 578 579 def detach(self): 580 """ Turns off all the base stations so the DUT loose connection.""" 581 self.cmw.detach() 582 583 def stop(self): 584 """ Stops current simulation. After calling this method, the simulator 585 will need to be set up again. """ 586 raise NotImplementedError() 587 588 def start_data_traffic(self): 589 """ Starts transmitting data from the instrument to the DUT. """ 590 raise NotImplementedError() 591 592 def stop_data_traffic(self): 593 """ Stops transmitting data from the instrument to the DUT. """ 594 raise NotImplementedError() 595 596 def send_sms(self, sms_message): 597 """ Sends SMS message from the instrument to the DUT. """ 598 self.cmw.wait_for_attached_state() 599 self.cmw.set_sms(sms_message) 600 self.cmw.send_sms() 601 602