1# Copyright 2022 - The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the 'License'); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an 'AS IS' BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import re 15import logging 16import os 17import paramiko 18import socket 19import time 20 21from acts.controllers.cellular_simulator import AbstractCellularSimulator 22 23class SocketWrapper(): 24 """A wrapper for socket communicate with test equipment. 25 26 Attributes: 27 _socket: a socket object. 28 _ip: a string value for ip address 29 which we want to connect. 30 _port: an integer for port 31 which we want to connect. 32 _connecting_timeout: an integer for socket connecting timeout. 33 _encode_format: a string specify encoding format. 34 _cmd_terminator: a character indicates the end of command/data 35 which need to be sent. 36 """ 37 38 def __init__(self, ip, port, 39 connecting_timeout=120, 40 cmd_terminator='\n', 41 encode_format='utf-8', 42 buff_size=1024): 43 self._socket = None 44 self._ip = ip 45 self._port = port 46 self._connecting_timeout = connecting_timeout 47 self._cmd_terminator = cmd_terminator 48 self._encode_format = encode_format 49 self._buff_size = buff_size 50 self._logger = logging.getLogger(__name__) 51 52 def _connect(self): 53 self._socket = socket.create_connection( 54 (self._ip, self._port), timeout=self._connecting_timeout 55 ) 56 57 def send_command(self, cmd: str): 58 if not self._socket: 59 self._connect() 60 if cmd and cmd[-1] != self._cmd_terminator: 61 cmd = cmd + self._cmd_terminator 62 self._socket.sendall(cmd.encode(self._encode_format)) 63 64 def send_command_recv(self, cmd: str) -> str: 65 """Send data and wait for response 66 67 Args: 68 cmd: a string command to be sent. 69 70 Returns: 71 a string response. 72 """ 73 self.send_command(cmd) 74 response = '' 75 try: 76 response = self._socket.recv(self._buff_size).decode( 77 self._encode_format 78 ) 79 except socket.timeout as e: 80 self._logger.info('Socket timeout while receiving response.') 81 self.close() 82 raise 83 84 return response 85 86 def close(self): 87 self._socket.close() 88 self._socket = None 89 90class UXMCellularSimulator(AbstractCellularSimulator): 91 """A cellular simulator for UXM callbox.""" 92 93 # Keys to obtain data from cell_info dictionary. 94 KEY_CELL_NUMBER = "cell_number" 95 KEY_CELL_TYPE = "cell_type" 96 97 # UXM socket port 98 UXM_SOCKET_PORT = 5125 99 100 # UXM SCPI COMMAND 101 SCPI_IMPORT_STATUS_QUERY_CMD = 'SYSTem:SCPI:IMPort:STATus?' 102 SCPI_SYSTEM_ERROR_CHECK_CMD = 'SYST:ERR?\n' 103 SCPI_CHECK_CONNECTION_CMD = '*IDN?\n' 104 SCPI_DEREGISTER_UE_IMS = 'SYSTem:IMS:SERVer:UE:DERegister' 105 # require: path to SCPI file 106 SCPI_IMPORT_SCPI_FILE_CMD = 'SYSTem:SCPI:IMPort "{}"\n' 107 # require: 1. cell type (E.g. NR5G), 2. cell number (E.g CELL1) 108 SCPI_CELL_ON_CMD = 'BSE:CONFig:{}:{}:ACTive 1' 109 SCPI_CELL_OFF_CMD = 'BSE:CONFig:{}:{}:ACTive 0' 110 SCPI_GET_CELL_STATUS = 'BSE:STATus:{}:{}?' 111 SCPI_RRC_RELEASE_LTE_CMD = 'BSE:FUNCtion:{}:{}:RELease:SEND' 112 SCPI_RRC_RELEASE_NR_CMD = 'BSE:CONFig:{}:{}:RCONtrol:RRC:STARt RRELease' 113 # require cell number 114 SCPI_CREATE_DEDICATED_BEARER = 'BSE:FUNCtion:LTE:{}:NAS:EBID10:DEDicated:CREate' 115 SCPI_CHANGE_SIM_NR_CMD = 'BSE:CONFig:NR5G:CELL1:SECurity:AUTHenticate:KEY:TYPE {}' 116 SCPI_CHANGE_SIM_LTE_CMD = 'BSE:CONFig:LTE:SECurity:AUTHenticate:KEY {}' 117 SCPI_SETTINGS_PRESET_CMD = 'SYSTem:PRESet:FULL' 118 119 # UXM's Test Application recovery 120 TA_BOOT_TIME = 100 121 122 # shh command 123 SSH_START_GUI_APP_CMD_FORMAT = 'psexec -s -d -i 1 "{exe_path}"' 124 SSH_CHECK_APP_RUNNING_CMD_FORMAT = 'tasklist | findstr /R {regex_app_name}' 125 SSH_KILL_PROCESS_BY_NAME = 'taskkill /IM {process_name} /F' 126 UXM_TEST_APP_NAME = 'TestApp.exe' 127 128 # start process success regex 129 PSEXEC_PROC_STARTED_REGEX_FORMAT = 'started on * with process ID {proc_id}' 130 131 # HCCU default value 132 HCCU_SOCKET_PORT = 4882 133 # number of digit of the length of setup name 134 HCCU_SCPI_CHANGE_SETUP_CMD = ':SYSTem:SETup:CONFig #{number_of_digit}{setup_name_len}{setup_name}' 135 HCCU_SCPI_CHANGE_SCENARIO_CMD = ':SETup:SCENe "((NE_1, {scenario_name}))"' 136 HCCU_STATUS_CHECK_CMD = ':SETup:INSTrument:STATus? 0\n' 137 HCCU_FR2_SETUP_NAME = '{Name:"TSPC_1UXM5G_HF_2RRH_M1740A"}' 138 HCCU_FR1_SETUP_NAME = '{Name:"TSPC_1UXM5G_LF"}' 139 HCCU_GET_INSTRUMENT_COUNT_CMD = ':SETup:INSTrument:COUNt?' 140 HCCU_FR2_INSTRUMENT_COUNT = 5 141 HCCU_FR1_INSTRUMENT_COUNT = 2 142 HCCU_FR2_SCENARIO = 'NR_4DL2x2_2UL2x2_LTE_4CC' 143 HCCU_FR1_SCENARIO = 'NR_1DL4x4_1UL2x2_LTE_4CC' 144 145 146 def __init__(self, ip_address, custom_files,uxm_user, 147 ssh_private_key_to_uxm, ta_exe_path, ta_exe_name): 148 """Initializes the cellular simulator. 149 150 Args: 151 ip_address: the ip address of host where Keysight Test Application (TA) 152 is installed. 153 custom_files: a list of file path for custom files. 154 uxm_user: username of host where Keysight TA resides. 155 ssh_private_key_to_uxm: private key for key based ssh to 156 host where Keysight TA resides. 157 ta_exe_path: path to TA exe. 158 ta_exe_name: name of TA exe. 159 """ 160 super().__init__() 161 self.custom_files = custom_files 162 self.rockbottom_script = None 163 self.cells = [] 164 self.uxm_ip = ip_address 165 self.uxm_user = uxm_user 166 self.ssh_private_key_to_uxm = os.path.expanduser( 167 ssh_private_key_to_uxm) 168 self.ta_exe_path = ta_exe_path 169 self.ta_exe_name = ta_exe_name 170 self.ssh_client = self.create_ssh_client() 171 172 # get roclbottom file 173 for file in self.custom_files: 174 if 'rockbottom_' in file: 175 self.rockbottom_script = file 176 177 # connect to Keysight Test Application via socket 178 self.recovery_ta() 179 self.socket = self._socket_connect(self.uxm_ip, self.UXM_SOCKET_PORT) 180 self.check_socket_connection() 181 self.timeout = 120 182 183 # hccu socket 184 self.hccu_socket_port = self.HCCU_SOCKET_PORT 185 self.hccu_socket = SocketWrapper(self.uxm_ip, self.hccu_socket_port) 186 187 def socket_connect(self): 188 self.socket = self._socket_connect(self.uxm_ip, self.UXM_SOCKET_PORT) 189 190 def switch_HCCU_scenario(self, scenario_name: str): 191 cmd = self.HCCU_SCPI_CHANGE_SCENARIO_CMD.format( 192 scenario_name=scenario_name) 193 self.hccu_socket.send_command(cmd) 194 self.log.debug(f'Sent command: {cmd}') 195 # this is require for the command to take effect 196 # because hccu's port need to be free. 197 self.hccu_socket.close() 198 199 def switch_HCCU_setup(self, setup_name: str): 200 """Change HHCU system setup. 201 202 Args: 203 setup_name: a string name 204 of the system setup will be changed to. 205 """ 206 setup_name_len = str(len(setup_name)) 207 number_of_digit = str(len(setup_name_len)) 208 cmd = self.HCCU_SCPI_CHANGE_SETUP_CMD.format( 209 number_of_digit=number_of_digit, 210 setup_name_len=setup_name_len, 211 setup_name=setup_name 212 ) 213 self.hccu_socket.send_command(cmd) 214 self.log.debug(f'Sent command: {cmd}') 215 # this is require for the command to take effect 216 # because hccu's port need to be free. 217 self.hccu_socket.close() 218 219 def wait_until_hccu_operational(self, timeout=1200): 220 """ Wait for hccu is ready to operate for a specified timeout. 221 222 Args: 223 timeout: time we are waiting for 224 hccu in opertional status. 225 226 Returns: 227 True if HCCU status is operational within timeout. 228 False otherwise. 229 """ 230 # check status 231 self.log.info('Waiting for HCCU to ready to operate.') 232 cmd = self.HCCU_STATUS_CHECK_CMD 233 t = 0 234 interval = 10 235 while t < timeout: 236 response = self.hccu_socket.send_command_recv(cmd) 237 if response == 'OPER\n': 238 return True 239 time.sleep(interval) 240 t += interval 241 return False 242 243 def switch_HCCU_settings(self, is_fr2: bool): 244 """Set HCCU setup configuration. 245 246 HCCU stands for Hardware Configuration Control Utility, 247 an interface allows us to control Keysight Test Equipment. 248 249 Args: 250 is_fr2: a bool value. 251 """ 252 # change HCCU configration 253 data = '' 254 scenario_name = '' 255 instrument_count_res = self.hccu_socket.send_command_recv( 256 self.HCCU_GET_INSTRUMENT_COUNT_CMD) 257 instrument_count = int(instrument_count_res) 258 # if hccu setup is correct, no need to change. 259 if is_fr2 and instrument_count == self.HCCU_FR2_INSTRUMENT_COUNT: 260 self.log.info('UXM has correct HCCU setup.') 261 return 262 if not is_fr2 and instrument_count == self.HCCU_FR1_INSTRUMENT_COUNT: 263 self.log.info('UXM has correct HCCU setup.') 264 return 265 266 self.log.info('UXM has incorrect HCCU setup, start changing setup.') 267 # terminate TA and close socket 268 self.log.info('Terminate TA before switch HCCU settings.') 269 self.terminate_process(self.UXM_TEST_APP_NAME) 270 self.socket.close() 271 272 # change hccu setup 273 if is_fr2: 274 data = self.HCCU_FR2_SETUP_NAME 275 scenario_name = self.HCCU_FR2_SCENARIO 276 else: 277 data = self.HCCU_FR1_SETUP_NAME 278 scenario_name = self.HCCU_FR1_SCENARIO 279 self.log.info('Switch HCCU setup.') 280 self.switch_HCCU_setup(data) 281 time.sleep(10) 282 if not self.wait_until_hccu_operational(): 283 raise RuntimeError('Fail to switch HCCU setup.') 284 285 # change scenario 286 self.log.info('Ativate HCCU scenario.') 287 self.switch_HCCU_scenario(scenario_name) 288 time.sleep(40) 289 if not self.wait_until_hccu_operational(): 290 raise RuntimeError('Fail to switch HCCU scenario.') 291 292 # start TA and reconnect socket. 293 self.recovery_ta() 294 self.socket = self._socket_connect(self.uxm_ip, self.UXM_SOCKET_PORT) 295 296 def create_ssh_client(self): 297 """Create a ssh client to host.""" 298 ssh = paramiko.SSHClient() 299 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 300 mykey = paramiko.Ed25519Key.from_private_key_file( 301 self.ssh_private_key_to_uxm) 302 ssh.connect(hostname=self.uxm_ip, username=self.uxm_user, pkey=mykey) 303 self.log.info('SSH client to %s is connected' % self.uxm_ip) 304 return ssh 305 306 def terminate_process(self, process_name): 307 cmd = self.SSH_KILL_PROCESS_BY_NAME.format( 308 process_name=process_name 309 ) 310 stdin, stdout, stderr = self.ssh_client.exec_command(cmd) 311 stdin.close() 312 err = ''.join(stderr.readlines()) 313 out = ''.join(stdout.readlines()) 314 final_output = str(out) + str(err) 315 self.log.info(final_output) 316 return out 317 318 def is_ta_running(self): 319 is_running_cmd = self.SSH_CHECK_APP_RUNNING_CMD_FORMAT.format( 320 regex_app_name=self.ta_exe_name) 321 stdin, stdout, stderr = self.ssh_client.exec_command(is_running_cmd) 322 stdin.close() 323 err = ''.join(stderr.readlines()) 324 out = ''.join(stdout.readlines()) 325 final_output = str(out) + str(err) 326 self.log.info(final_output) 327 return (out != '' and err == '') 328 329 def _start_test_app(self): 330 """Start Test Application on Windows.""" 331 # start GUI exe via ssh 332 start_app_cmd = self.SSH_START_GUI_APP_CMD_FORMAT.format( 333 exe_path=self.ta_exe_path) 334 stdin, stdout, stderr = self.ssh_client.exec_command(start_app_cmd) 335 self.log.info(f'Command sent to {self.uxm_ip}: {start_app_cmd}') 336 stdin.close() 337 err = ''.join(stderr.readlines()) 338 out = ''.join(stdout.readlines()) 339 # psexec return process ID as part of the exit code 340 exit_status = stderr.channel.recv_exit_status() 341 is_started = re.search( 342 self.PSEXEC_PROC_STARTED_REGEX_FORMAT.format(proc_id=exit_status), 343 err[-1]) 344 if is_started: 345 raise RuntimeError('Fail to start TA: ' + out + err) 346 # wait for ta completely boot up 347 self.log.info('TA is starting') 348 time.sleep(self.TA_BOOT_TIME) 349 350 def recovery_ta(self): 351 """Start TA if it is not running.""" 352 if not self.is_ta_running(): 353 self._start_test_app() 354 # checking if ta booting process complete 355 # by checking socket connection 356 s = None 357 retries = 12 358 for _ in range(retries): 359 try: 360 s = self._socket_connect(self.uxm_ip, self.UXM_SOCKET_PORT) 361 s.close() 362 return 363 except ConnectionRefusedError as cre: 364 self.log.info( 365 'Connection refused, wait 10s for TA to boot') 366 time.sleep(10) 367 raise RuntimeError('TA does not start on time') 368 369 def set_rockbottom_script_path(self, path): 370 """Set path to rockbottom script. 371 372 Args: 373 path: path to rockbottom script. 374 """ 375 self.rockbottom_script = path 376 377 def set_cell_info(self, cell_info): 378 """Set type and number for multiple cells. 379 380 Args: 381 cell_info: list of dictionaries, 382 each dictionary contain cell type 383 and cell number for each cell 384 that the simulator need to control. 385 """ 386 if not cell_info: 387 raise ValueError('Missing cell info from configurations file') 388 self.cells = cell_info 389 390 def deregister_ue_ims(self): 391 """Remove UE IMS profile from UXM.""" 392 self._socket_send_SCPI_command( 393 self.SCPI_DEREGISTER_UE_IMS) 394 395 def create_dedicated_bearer(self): 396 """Create a dedicated bearer setup for ims call. 397 398 After UE connected and register on UXM IMS tab. 399 It is required to create a dedicated bearer setup 400 with EPS bearer ID 10. 401 """ 402 cell_number = self.cells[0][self.KEY_CELL_NUMBER] 403 self._socket_send_SCPI_command( 404 self.SCPI_CREATE_DEDICATED_BEARER.format(cell_number)) 405 406 def turn_cell_on(self, cell_type, cell_number): 407 """Turn UXM's cell on. 408 409 Args: 410 cell_type: type of cell (e.g NR5G, LTE). 411 cell_number: ordinal number of a cell. 412 """ 413 if cell_type and cell_number: 414 self._socket_send_SCPI_command( 415 self.SCPI_CELL_ON_CMD.format(cell_type, cell_number)) 416 else: 417 raise ValueError('Invalid cell info\n' + 418 f' cell type: {cell_type}\n' + 419 f' cell number: {cell_number}\n') 420 421 def turn_cell_off(self, cell_type, cell_number): 422 """Turn UXM's cell off. 423 424 Args: 425 cell_type: type of cell (e.g NR5G, LTE). 426 cell_number: ordinal number of a cell. 427 """ 428 if cell_type and cell_number: 429 self._socket_send_SCPI_command( 430 self.SCPI_CELL_OFF_CMD.format(cell_type, cell_number)) 431 else: 432 raise ValueError('Invalid cell info\n' + 433 f' cell type: {cell_type}\n' + 434 f' cell number: {cell_number}\n') 435 436 def get_cell_status(self, cell_type, cell_number): 437 """Get status of cell. 438 439 Args: 440 cell_type: type of cell (e.g NR5G, LTE). 441 cell_number: ordinal number of a cell. 442 """ 443 if not cell_type or not cell_number: 444 raise ValueError('Invalid cell with\n' + 445 f' cell type: {cell_type}\n' + 446 f' cell number: {cell_number}\n') 447 448 return self._socket_send_SCPI_for_result_command( 449 self.SCPI_GET_CELL_STATUS.format(cell_type, cell_number)) 450 451 def check_socket_connection(self): 452 """Check if the socket connection is established. 453 454 Query the identification of the Keysight Test Application 455 we are trying to connect to. Empty response indicates 456 connection fail, and vice versa. 457 """ 458 self.socket.sendall(self.SCPI_CHECK_CONNECTION_CMD.encode()) 459 response = self.socket.recv(1024).decode() 460 if response: 461 self.log.info(f'Connected to: {response}') 462 else: 463 self.log.error('Fail to connect to callbox') 464 465 def _socket_connect(self, host, port): 466 """Create socket connection. 467 468 Args: 469 host: IP address of desktop where Keysight Test Application resides. 470 port: port that Keysight Test Application is listening for socket 471 communication. 472 Returns: 473 s: socket object. 474 """ 475 self.log.info('Establishing connection to callbox via socket') 476 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 477 s.connect((host, port)) 478 return s 479 480 def _socket_send_SCPI_command(self, command): 481 """Send SCPI command without expecting response. 482 483 Args: 484 command: a string SCPI command. 485 """ 486 # make sure there is a line break for the socket to send command 487 command = command + '\n' 488 # send command 489 self.socket.sendall(command.encode()) 490 self.log.info(f'Sent {command}') 491 492 def _socket_receive_SCPI_result(self): 493 """Receive response from socket. """ 494 i = 1 495 response = '' 496 while i < self.timeout and not response: 497 response = self.socket.recv(1024).decode() 498 i += 1 499 return response 500 501 def _socket_send_SCPI_for_result_command(self, command): 502 """Send SCPI command and expecting response. 503 504 Args: 505 command: a string SCPI command. 506 """ 507 self._socket_send_SCPI_command(command) 508 response = self._socket_receive_SCPI_result() 509 return response 510 511 def check_system_error(self): 512 """Query system error from Keysight Test Application. 513 514 Returns: 515 status: a message indicate the number of errors 516 and detail of errors if any. 517 a string `0,"No error"` indicates no error. 518 """ 519 status = self._socket_send_SCPI_for_result_command( 520 self.SCPI_SYSTEM_ERROR_CHECK_CMD) 521 self.log.info(f'System error status: {status}') 522 return status 523 524 def import_configuration(self, path): 525 """Import SCPI config file. 526 527 Args: 528 path: path to SCPI file. 529 """ 530 self._socket_send_SCPI_command( 531 self.SCPI_SETTINGS_PRESET_CMD) 532 time.sleep(10) 533 self._socket_send_SCPI_command( 534 self.SCPI_IMPORT_SCPI_FILE_CMD.format(path)) 535 time.sleep(45) 536 537 def destroy(self): 538 """Close socket connection with UXM. """ 539 self.socket.close() 540 541 def setup_lte_scenario(self, path): 542 """Configures the equipment for an LTE simulation. 543 544 Args: 545 path: path to SCPI config file. 546 """ 547 self.import_configuration(path) 548 549 def dut_rockbottom(self, dut): 550 """Set the dut to rockbottom state. 551 552 Args: 553 dut: a CellularAndroid controller. 554 """ 555 # The rockbottom script might include a device reboot, so it is 556 # necessary to stop SL4A during its execution. 557 dut.ad.stop_services() 558 self.log.info('Executing rockbottom script for ' + dut.ad.model) 559 os.chmod(self.rockbottom_script, 0o777) 560 os.system('{} {}'.format(self.rockbottom_script, dut.ad.serial)) 561 # Make sure the DUT is in root mode after coming back 562 dut.ad.root_adb() 563 # Restart SL4A 564 dut.ad.start_services() 565 566 def set_sim_type(self, is_3gpp_sim): 567 sim_type = 'KEYSight' 568 if is_3gpp_sim: 569 sim_type = 'TEST3GPP' 570 self._socket_send_SCPI_command( 571 self.SCPI_CHANGE_SIM_NR_CMD.format(sim_type)) 572 time.sleep(2) 573 self._socket_send_SCPI_command( 574 self.SCPI_CHANGE_SIM_LTE_CMD.format(sim_type)) 575 time.sleep(2) 576 577 def wait_until_attached_one_cell(self, 578 cell_type, 579 cell_number, 580 dut, 581 wait_for_camp_interval, 582 attach_retries): 583 """Wait until connect to given UXM cell. 584 585 After turn off airplane mode, sleep for 586 wait_for_camp_interval seconds for device to camp. 587 If not device is not connected after the wait, 588 either toggle airplane mode on/off or reboot device. 589 Args: 590 cell_type: type of cell 591 which we are trying to connect to. 592 cell_number: ordinal number of a cell 593 which we are trying to connect to. 594 dut: a AndroidCellular controller. 595 wait_for_camp_interval: sleep interval, 596 wait for device to camp. 597 attach_retries: number of retry 598 to wait for device 599 to connect to 1 basestation. 600 Raise: 601 RuntimeError: device unable to connect to cell. 602 """ 603 # airplane mode on 604 dut.toggle_airplane_mode(True) 605 time.sleep(5) 606 607 # turn cell on 608 self.turn_cell_on(cell_type, cell_number) 609 time.sleep(5) 610 611 interval = 10 612 # waits for device to camp 613 for index in range(1, attach_retries): 614 count = 0 615 # airplane mode off 616 dut.toggle_airplane_mode(False) 617 time.sleep(5) 618 # check connection in small interval 619 while count < wait_for_camp_interval: 620 time.sleep(interval) 621 cell_state = self.get_cell_status(cell_type, cell_number) 622 self.log.info(f'cell state: {cell_state}') 623 if cell_state == 'CONN\n': 624 # wait for connection stable 625 time.sleep(15) 626 # check connection status again 627 cell_state = self.get_cell_status(cell_type, cell_number) 628 self.log.info(f'cell state: {cell_state}') 629 if cell_state == 'CONN\n': 630 return True 631 if cell_state == 'OFF\n': 632 self.turn_cell_on(cell_type, cell_number) 633 time.sleep(5) 634 count += interval 635 636 # reboot device 637 if (index % 2) == 0: 638 dut.ad.reboot() 639 if self.rockbottom_script: 640 self.dut_rockbottom(dut) 641 else: 642 self.log.warning( 643 f'Rockbottom script was not executed after reboot.' 644 ) 645 # toggle APM and cell on/off 646 elif (index % 1) == 0: 647 # Toggle APM on 648 dut.toggle_airplane_mode(True) 649 time.sleep(5) 650 651 # Toggle simulator cell 652 self.turn_cell_off(cell_type, cell_number) 653 time.sleep(5) 654 self.turn_cell_on(cell_type, cell_number) 655 time.sleep(5) 656 657 # Toggle APM off 658 dut.toggle_airplane_mode(False) 659 time.sleep(5) 660 # increase length of small waiting interval 661 interval += 5 662 663 # Phone cannot connected to basestation of callbox 664 raise RuntimeError( 665 f'Phone was unable to connect to cell: {cell_type}-{cell_number}') 666 667 def wait_until_attached(self, dut, timeout, attach_retries): 668 """Waits until the DUT is attached to all required cells. 669 670 Args: 671 dut: a CellularAndroid controller. 672 timeout: sleep interval, 673 wait for device to camp in 1 try. 674 attach_retries: number of retry 675 to wait for device 676 to connect to 1 basestation. 677 """ 678 # get cell info 679 first_cell_type = self.cells[0][self.KEY_CELL_TYPE] 680 first_cell_number = self.cells[0][self.KEY_CELL_NUMBER] 681 if len(self.cells) == 2: 682 second_cell_type = self.cells[1][self.KEY_CELL_TYPE] 683 second_cell_number = self.cells[1][self.KEY_CELL_NUMBER] 684 685 # connect to 1st cell 686 self.wait_until_attached_one_cell(first_cell_type, 687 first_cell_number, dut, timeout, 688 attach_retries) 689 690 # aggregation to NR 691 if len(self.cells) == 2: 692 self.turn_cell_on( 693 second_cell_type, 694 second_cell_number, 695 ) 696 697 for _ in range(1, attach_retries): 698 self.log.info('Try to aggregate to NR.') 699 self._socket_send_SCPI_command( 700 'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:DL None') 701 self._socket_send_SCPI_command( 702 'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:UL None') 703 self._socket_send_SCPI_command( 704 'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:DL CELL1') 705 self._socket_send_SCPI_command( 706 'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:DL CELL1') 707 time.sleep(1) 708 self._socket_send_SCPI_command( 709 "BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:APPly") 710 # wait for status stable 711 time.sleep(10) 712 cell_state = self.get_cell_status(second_cell_type, second_cell_number) 713 self.log.info(f'cell state: {cell_state}') 714 if cell_state == 'CONN\n': 715 return 716 else: 717 self.turn_cell_off(second_cell_type, second_cell_number) 718 # wait for LTE cell to connect again 719 self.wait_until_attached_one_cell(first_cell_type, 720 first_cell_number, dut, 120, 721 2) 722 723 raise RuntimeError(f'Fail to aggregate to NR from LTE.') 724 725 def set_lte_rrc_state_change_timer(self, enabled, time=10): 726 """Configures the LTE RRC state change timer. 727 728 Args: 729 enabled: a boolean indicating if the timer should be on or off. 730 time: time in seconds for the timer to expire. 731 """ 732 raise NotImplementedError( 733 'This UXM callbox simulator does not support this feature.') 734 735 def set_band(self, bts_index, band): 736 """Sets the band for the indicated base station. 737 738 Args: 739 bts_index: the base station number. 740 band: the new band. 741 """ 742 raise NotImplementedError( 743 'This UXM callbox simulator does not support this feature.') 744 745 def get_duplex_mode(self, band): 746 """Determines if the band uses FDD or TDD duplex mode 747 748 Args: 749 band: a band number. 750 751 Returns: 752 an variable of class DuplexMode indicating if band is FDD or TDD. 753 """ 754 raise NotImplementedError( 755 'This UXM callbox simulator does not support this feature.') 756 757 def set_input_power(self, bts_index, input_power): 758 """Sets the input power for the indicated base station. 759 760 Args: 761 bts_index: the base station number. 762 input_power: the new input power. 763 """ 764 raise NotImplementedError( 765 'This UXM callbox simulator does not support this feature.') 766 767 def set_output_power(self, bts_index, output_power): 768 """Sets the output power for the indicated base station. 769 770 Args: 771 bts_index: the base station number. 772 output_power: the new output power. 773 """ 774 raise NotImplementedError( 775 'This UXM callbox simulator does not support this feature.') 776 777 def set_tdd_config(self, bts_index, tdd_config): 778 """Sets the tdd configuration number for the indicated base station. 779 780 Args: 781 bts_index: the base station number. 782 tdd_config: the new tdd configuration number. 783 """ 784 raise NotImplementedError( 785 'This UXM callbox simulator does not support this feature.') 786 787 def set_ssf_config(self, bts_index, ssf_config): 788 """Sets the Special Sub-Frame config number for the indicated. 789 790 base station. 791 792 Args: 793 bts_index: the base station number. 794 ssf_config: the new ssf config number. 795 """ 796 raise NotImplementedError( 797 'This UXM callbox simulator does not support this feature.') 798 799 def set_bandwidth(self, bts_index, bandwidth): 800 """Sets the bandwidth for the indicated base station. 801 802 Args: 803 bts_index: the base station number 804 bandwidth: the new bandwidth 805 """ 806 raise NotImplementedError( 807 'This UXM callbox simulator does not support this feature.') 808 809 def set_downlink_channel_number(self, bts_index, channel_number): 810 """Sets the downlink channel number for the indicated base station. 811 812 Args: 813 bts_index: the base station number. 814 channel_number: the new channel number. 815 """ 816 raise NotImplementedError( 817 'This UXM callbox simulator does not support this feature.') 818 819 def set_mimo_mode(self, bts_index, mimo_mode): 820 """Sets the mimo mode for the indicated base station. 821 822 Args: 823 bts_index: the base station number 824 mimo_mode: the new mimo mode 825 """ 826 raise NotImplementedError( 827 'This UXM callbox simulator does not support this feature.') 828 829 def set_transmission_mode(self, bts_index, tmode): 830 """Sets the transmission mode for the indicated base station. 831 832 Args: 833 bts_index: the base station number. 834 tmode: the new transmission mode. 835 """ 836 raise NotImplementedError( 837 'This UXM callbox simulator does not support this feature.') 838 839 def set_scheduling_mode(self, 840 bts_index, 841 scheduling, 842 mcs_dl=None, 843 mcs_ul=None, 844 nrb_dl=None, 845 nrb_ul=None): 846 """Sets the scheduling mode for the indicated base station. 847 848 Args: 849 bts_index: the base station number. 850 scheduling: the new scheduling mode. 851 mcs_dl: Downlink MCS. 852 mcs_ul: Uplink MCS. 853 nrb_dl: Number of RBs for downlink. 854 nrb_ul: Number of RBs for uplink. 855 """ 856 raise NotImplementedError( 857 'This UXM callbox simulator does not support this feature.') 858 859 def set_dl_256_qam_enabled(self, bts_index, enabled): 860 """Determines what MCS table should be used for the downlink. 861 862 This only saves the setting that will be used when configuring MCS. 863 864 Args: 865 bts_index: the base station number. 866 enabled: whether 256 QAM should be used. 867 """ 868 raise NotImplementedError( 869 'This UXM callbox simulator does not support this feature.') 870 871 def set_ul_64_qam_enabled(self, bts_index, enabled): 872 """Determines what MCS table should be used for the uplink. 873 874 This only saves the setting that will be used when configuring MCS. 875 876 Args: 877 bts_index: the base station number. 878 enabled: whether 64 QAM should be used. 879 """ 880 raise NotImplementedError( 881 'This UXM callbox simulator does not support this feature.') 882 883 def set_mac_padding(self, bts_index, mac_padding): 884 """Enables or disables MAC padding in the indicated base station. 885 886 Args: 887 bts_index: the base station number. 888 mac_padding: the new MAC padding setting. 889 """ 890 raise NotImplementedError( 891 'This UXM callbox simulator does not support this feature.') 892 893 def set_cfi(self, bts_index, cfi): 894 """Sets the Channel Format Indicator for the indicated base station. 895 896 Args: 897 bts_index: the base station number. 898 cfi: the new CFI setting. 899 """ 900 raise NotImplementedError( 901 'This UXM callbox simulator does not support this feature.') 902 903 def set_paging_cycle(self, bts_index, cycle_duration): 904 """Sets the paging cycle duration for the indicated base station. 905 906 Args: 907 bts_index: the base station number. 908 cycle_duration: the new paging cycle duration in milliseconds. 909 """ 910 raise NotImplementedError( 911 'This UXM callbox simulator does not support this feature.') 912 913 def set_phich_resource(self, bts_index, phich): 914 """Sets the PHICH Resource setting for the indicated base station. 915 916 Args: 917 bts_index: the base station number. 918 phich: the new PHICH resource setting. 919 """ 920 raise NotImplementedError( 921 'This UXM callbox simulator does not support this feature.') 922 923 def lte_attach_secondary_carriers(self, ue_capability_enquiry): 924 """Activates the secondary carriers for CA. 925 926 Requires the DUT to be attached to the primary carrier first. 927 928 Args: 929 ue_capability_enquiry: UE capability enquiry message to be sent to 930 the UE before starting carrier aggregation. 931 """ 932 raise NotImplementedError( 933 'This UXM callbox simulator does not support this feature.') 934 935 def wait_until_communication_state(self, timeout=120): 936 """Waits until the DUT is in Communication state. 937 938 Args: 939 timeout: after this amount of time the method will raise 940 a CellularSimulatorError exception. Default is 120 seconds. 941 """ 942 raise NotImplementedError( 943 'This UXM callbox simulator does not support this feature.') 944 945 def wait_until_idle_state(self, timeout=120): 946 """Waits until the DUT is in Idle state. 947 948 Args: 949 timeout: after this amount of time the method will raise a 950 CellularSimulatorError exception. Default is 120 seconds. 951 """ 952 # turn on RRC release 953 cell_type = self.cells[0][self.KEY_CELL_TYPE] 954 cell_number = self.cells[0][self.KEY_CELL_NUMBER] 955 956 # choose cmd base on cell type 957 cmd = None 958 if cell_type == 'LTE': 959 cmd = self.SCPI_RRC_RELEASE_LTE_CMD 960 else: 961 cmd = self.SCPI_RRC_RELEASE_NR_CMD 962 963 if not cmd: 964 raise RuntimeError(f'Cell type [{cell_type}] is not supporting IDLE.') 965 966 # checking status 967 self.log.info('Wait for IDLE state.') 968 for _ in range(5): 969 cell_state = self.get_cell_status(cell_type, cell_number) 970 self.log.info(f'cell state: {cell_state}') 971 if cell_state == 'CONN\n': 972 # RRC release 973 self._socket_send_SCPI_command(cmd.format(cell_type, cell_number)) 974 # wait for status stable 975 time.sleep(60) 976 elif cell_state == 'IDLE\n': 977 return 978 979 raise RuntimeError('RRC release fail.') 980 981 def detach(self): 982 """ Turns off all the base stations so the DUT loose connection.""" 983 for cell in self.cells: 984 cell_type = cell[self.KEY_CELL_TYPE] 985 cell_number = cell[self.KEY_CELL_NUMBER] 986 self.turn_cell_off(cell_type, cell_number) 987 time.sleep(5) 988 989 def stop(self): 990 """Stops current simulation. 991 992 After calling this method, the simulator will need to be set up again. 993 """ 994 raise NotImplementedError( 995 'This UXM callbox simulator does not support this feature.') 996 997 def start_data_traffic(self): 998 """Starts transmitting data from the instrument to the DUT. """ 999 raise NotImplementedError( 1000 'This UXM callbox simulator does not support this feature.') 1001 1002 def stop_data_traffic(self): 1003 """Stops transmitting data from the instrument to the DUT. """ 1004 raise NotImplementedError( 1005 'This UXM callbox simulator does not support this feature.') 1006