1# Copyright 2022 - The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the 'License'); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an 'AS IS' BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import re 15import logging 16import os 17import paramiko 18import socket 19import time 20 21from acts.controllers.cellular_simulator import AbstractCellularSimulator 22 23class SocketWrapper(): 24 """A wrapper for socket communicate with test equipment. 25 26 Attributes: 27 _socket: a socket object. 28 _ip: a string value for ip address 29 which we want to connect. 30 _port: an integer for port 31 which we want to connect. 32 _connecting_timeout: an integer for socket connecting timeout. 33 _encode_format: a string specify encoding format. 34 _cmd_terminator: a character indicates the end of command/data 35 which need to be sent. 36 """ 37 38 def __init__(self, ip, port, 39 connecting_timeout=120, 40 cmd_terminator='\n', 41 encode_format='utf-8', 42 buff_size=1024): 43 self._socket = None 44 self._ip = ip 45 self._port = port 46 self._connecting_timeout = connecting_timeout 47 self._cmd_terminator = cmd_terminator 48 self._encode_format = encode_format 49 self._buff_size = buff_size 50 self._logger = logging.getLogger(__name__) 51 52 def _connect(self): 53 self._socket = socket.create_connection( 54 (self._ip, self._port), timeout=self._connecting_timeout 55 ) 56 57 def send_command(self, cmd: str): 58 if not self._socket: 59 self._connect() 60 if cmd and cmd[-1] != self._cmd_terminator: 61 cmd = cmd + self._cmd_terminator 62 self._socket.sendall(cmd.encode(self._encode_format)) 63 64 def send_command_recv(self, cmd: str) -> str: 65 """Send data and wait for response 66 67 Args: 68 cmd: a string command to be sent. 69 70 Returns: 71 a string response. 72 """ 73 self.send_command(cmd) 74 response = '' 75 try: 76 response = self._socket.recv(self._buff_size).decode( 77 self._encode_format 78 ) 79 except socket.timeout as e: 80 self._logger.info('Socket timeout while receiving response.') 81 self.close() 82 raise 83 84 return response 85 86 def close(self): 87 self._socket.close() 88 self._socket = None 89 90class UXMCellularSimulator(AbstractCellularSimulator): 91 """A cellular simulator for UXM callbox.""" 92 93 # Keys to obtain data from cell_info dictionary. 94 _KEY_CELL_NUMBER = "cell_number" 95 _KEY_CELL_TYPE = "cell_type" 96 97 # UXM socket port 98 UXM_SOCKET_PORT = 5125 99 100 # UXM SCPI COMMAND 101 SCPI_IMPORT_STATUS_QUERY_CMD = 'SYSTem:SCPI:IMPort:STATus?' 102 SCPI_SYSTEM_ERROR_CHECK_CMD = 'SYST:ERR?\n' 103 SCPI_CHECK_CONNECTION_CMD = '*IDN?\n' 104 SCPI_DEREGISTER_UE_IMS = 'SYSTem:IMS:SERVer:UE:DERegister' 105 _SCPI_CHANGE_DL_TDOMAIN = 'BSE:CONFig:NR5G:CELL1:SCHeduling:BWP0:FC0:SC0:DL:TDOMain:APOLicy ONMac' 106 _SCPI_CHANGE_UL_TDOMAIN = 'BSE:CONFig:NR5G:CELL1:SCHeduling:BWP0:FC0:SC0:UL:NUL:TDOMain:APOLicy ONSRbsr' 107 # require: path to SCPI file 108 SCPI_IMPORT_SCPI_FILE_CMD = 'SYSTem:SCPI:IMPort "{}"\n' 109 # require: 1. cell type (E.g. NR5G), 2. cell number (E.g CELL1) 110 SCPI_CELL_ON_CMD = 'BSE:CONFig:{}:{}:ACTive 1' 111 SCPI_CELL_OFF_CMD = 'BSE:CONFig:{}:{}:ACTive 0' 112 SCPI_GET_CELL_STATUS = 'BSE:STATus:{}:{}?' 113 SCPI_RRC_RELEASE_LTE_CMD = 'BSE:FUNCtion:{}:{}:RELease:SEND' 114 SCPI_RRC_RELEASE_NR_CMD = 'BSE:CONFig:{}:{}:RCONtrol:RRC:STARt RRELease' 115 # require cell number 116 SCPI_CREATE_DEDICATED_BEARER = 'BSE:FUNCtion:LTE:{}:NAS:EBID10:DEDicated:CREate' 117 SCPI_CHANGE_SIM_NR_CMD = 'BSE:CONFig:NR5G:CELL1:SECurity:AUTHenticate:KEY:TYPE {}' 118 SCPI_CHANGE_SIM_LTE_CMD = 'BSE:CONFig:LTE:SECurity:AUTHenticate:KEY {}' 119 SCPI_SETTINGS_PRESET_CMD = 'SYSTem:PRESet:FULL' 120 121 # UXM's Test Application recovery 122 TA_BOOT_TIME = 100 123 124 # shh command 125 SSH_START_GUI_APP_CMD_FORMAT = 'psexec -s -d -i 1 "{exe_path}"' 126 SSH_CHECK_APP_RUNNING_CMD_FORMAT = 'tasklist | findstr /R {regex_app_name}' 127 SSH_KILL_PROCESS_BY_NAME = 'taskkill /IM {process_name} /F' 128 UXM_TEST_APP_NAME = 'TestApp.exe' 129 130 # start process success regex 131 PSEXEC_PROC_STARTED_REGEX_FORMAT = 'started on * with process ID {proc_id}' 132 133 # HCCU default value 134 HCCU_SOCKET_PORT = 4882 135 # number of digit of the length of setup name 136 HCCU_SCPI_CHANGE_SETUP_CMD = ':SYSTem:SETup:CONFig #{number_of_digit}{setup_name_len}{setup_name}' 137 HCCU_SCPI_CHANGE_SCENARIO_CMD = ':SETup:SCENe "((NE_1, {scenario_name}))"' 138 HCCU_STATUS_CHECK_CMD = ':SETup:INSTrument:STATus? 0\n' 139 HCCU_FR2_SETUP_NAME = '{Name:"TSPC_1UXM5G_HF_2RRH_M1740A"}' 140 HCCU_FR1_SETUP_NAME = '{Name:"TSPC_1UXM5G_LF"}' 141 HCCU_GET_INSTRUMENT_COUNT_CMD = ':SETup:INSTrument:COUNt?' 142 HCCU_FR2_INSTRUMENT_COUNT = 5 143 HCCU_FR1_INSTRUMENT_COUNT = 2 144 HCCU_FR2_SCENARIO = 'NR_4DL2x2_2UL2x2_LTE_4CC' 145 HCCU_FR1_SCENARIO = 'NR_1DL4x4_1UL2x2_LTE_4CC' 146 147 148 def __init__(self, ip_address, custom_files,uxm_user, 149 ssh_private_key_to_uxm, ta_exe_path, ta_exe_name): 150 """Initializes the cellular simulator. 151 152 Args: 153 ip_address: the ip address of host where Keysight Test Application (TA) 154 is installed. 155 custom_files: a list of file path for custom files. 156 uxm_user: username of host where Keysight TA resides. 157 ssh_private_key_to_uxm: private key for key based ssh to 158 host where Keysight TA resides. 159 ta_exe_path: path to TA exe. 160 ta_exe_name: name of TA exe. 161 """ 162 super().__init__() 163 self.custom_files = custom_files 164 self.rockbottom_script = None 165 self.cells = [] 166 self.uxm_ip = ip_address 167 self.uxm_user = uxm_user 168 self.ssh_private_key_to_uxm = os.path.expanduser( 169 ssh_private_key_to_uxm) 170 self.ta_exe_path = ta_exe_path 171 self.ta_exe_name = ta_exe_name 172 self.ssh_client = self.create_ssh_client() 173 174 # get roclbottom file 175 for file in self.custom_files: 176 if 'rockbottom_' in file: 177 self.rockbottom_script = file 178 179 # connect to Keysight Test Application via socket 180 self.recovery_ta() 181 self.socket = self._socket_connect(self.uxm_ip, self.UXM_SOCKET_PORT) 182 self.check_socket_connection() 183 self.timeout = 120 184 185 # hccu socket 186 self.hccu_socket_port = self.HCCU_SOCKET_PORT 187 self.hccu_socket = SocketWrapper(self.uxm_ip, self.hccu_socket_port) 188 189 def socket_connect(self): 190 self.socket = self._socket_connect(self.uxm_ip, self.UXM_SOCKET_PORT) 191 192 def switch_HCCU_scenario(self, scenario_name: str): 193 cmd = self.HCCU_SCPI_CHANGE_SCENARIO_CMD.format( 194 scenario_name=scenario_name) 195 self.hccu_socket.send_command(cmd) 196 self.log.debug(f'Sent command: {cmd}') 197 # this is require for the command to take effect 198 # because hccu's port need to be free. 199 self.hccu_socket.close() 200 201 def switch_HCCU_setup(self, setup_name: str): 202 """Change HHCU system setup. 203 204 Args: 205 setup_name: a string name 206 of the system setup will be changed to. 207 """ 208 setup_name_len = str(len(setup_name)) 209 number_of_digit = str(len(setup_name_len)) 210 cmd = self.HCCU_SCPI_CHANGE_SETUP_CMD.format( 211 number_of_digit=number_of_digit, 212 setup_name_len=setup_name_len, 213 setup_name=setup_name 214 ) 215 self.hccu_socket.send_command(cmd) 216 self.log.debug(f'Sent command: {cmd}') 217 # this is require for the command to take effect 218 # because hccu's port need to be free. 219 self.hccu_socket.close() 220 221 def wait_until_hccu_operational(self, timeout=1200): 222 """ Wait for hccu is ready to operate for a specified timeout. 223 224 Args: 225 timeout: time we are waiting for 226 hccu in opertional status. 227 228 Returns: 229 True if HCCU status is operational within timeout. 230 False otherwise. 231 """ 232 # check status 233 self.log.info('Waiting for HCCU to ready to operate.') 234 cmd = self.HCCU_STATUS_CHECK_CMD 235 t = 0 236 interval = 10 237 while t < timeout: 238 response = self.hccu_socket.send_command_recv(cmd) 239 if response == 'OPER\n': 240 return True 241 time.sleep(interval) 242 t += interval 243 return False 244 245 def switch_HCCU_settings(self, is_fr2: bool): 246 """Set HCCU setup configuration. 247 248 HCCU stands for Hardware Configuration Control Utility, 249 an interface allows us to control Keysight Test Equipment. 250 251 Args: 252 is_fr2: a bool value. 253 """ 254 # change HCCU configration 255 data = '' 256 scenario_name = '' 257 instrument_count_res = self.hccu_socket.send_command_recv( 258 self.HCCU_GET_INSTRUMENT_COUNT_CMD) 259 instrument_count = int(instrument_count_res) 260 # if hccu setup is correct, no need to change. 261 if is_fr2 and instrument_count == self.HCCU_FR2_INSTRUMENT_COUNT: 262 self.log.info('UXM has correct HCCU setup.') 263 return 264 if not is_fr2 and instrument_count == self.HCCU_FR1_INSTRUMENT_COUNT: 265 self.log.info('UXM has correct HCCU setup.') 266 return 267 268 self.log.info('UXM has incorrect HCCU setup, start changing setup.') 269 # terminate TA and close socket 270 self.log.info('Terminate TA before switch HCCU settings.') 271 self.terminate_process(self.UXM_TEST_APP_NAME) 272 self.socket.close() 273 274 # change hccu setup 275 if is_fr2: 276 data = self.HCCU_FR2_SETUP_NAME 277 scenario_name = self.HCCU_FR2_SCENARIO 278 else: 279 data = self.HCCU_FR1_SETUP_NAME 280 scenario_name = self.HCCU_FR1_SCENARIO 281 self.log.info('Switch HCCU setup.') 282 self.switch_HCCU_setup(data) 283 time.sleep(10) 284 if not self.wait_until_hccu_operational(): 285 raise RuntimeError('Fail to switch HCCU setup.') 286 287 # change scenario 288 self.log.info('Ativate HCCU scenario.') 289 self.switch_HCCU_scenario(scenario_name) 290 time.sleep(40) 291 if not self.wait_until_hccu_operational(): 292 raise RuntimeError('Fail to switch HCCU scenario.') 293 294 # start TA and reconnect socket. 295 self.recovery_ta() 296 self.socket = self._socket_connect(self.uxm_ip, self.UXM_SOCKET_PORT) 297 298 def create_ssh_client(self): 299 """Create a ssh client to host.""" 300 ssh = paramiko.SSHClient() 301 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 302 mykey = paramiko.Ed25519Key.from_private_key_file( 303 self.ssh_private_key_to_uxm) 304 ssh.connect(hostname=self.uxm_ip, username=self.uxm_user, pkey=mykey) 305 self.log.info('SSH client to %s is connected' % self.uxm_ip) 306 return ssh 307 308 def terminate_process(self, process_name): 309 cmd = self.SSH_KILL_PROCESS_BY_NAME.format( 310 process_name=process_name 311 ) 312 stdin, stdout, stderr = self.ssh_client.exec_command(cmd) 313 stdin.close() 314 err = ''.join(stderr.readlines()) 315 out = ''.join(stdout.readlines()) 316 final_output = str(out) + str(err) 317 self.log.info(final_output) 318 return out 319 320 def is_ta_running(self): 321 is_running_cmd = self.SSH_CHECK_APP_RUNNING_CMD_FORMAT.format( 322 regex_app_name=self.ta_exe_name) 323 stdin, stdout, stderr = self.ssh_client.exec_command(is_running_cmd) 324 stdin.close() 325 err = ''.join(stderr.readlines()) 326 out = ''.join(stdout.readlines()) 327 final_output = str(out) + str(err) 328 self.log.info(final_output) 329 return (out != '' and err == '') 330 331 def _start_test_app(self): 332 """Start Test Application on Windows.""" 333 # start GUI exe via ssh 334 start_app_cmd = self.SSH_START_GUI_APP_CMD_FORMAT.format( 335 exe_path=self.ta_exe_path) 336 stdin, stdout, stderr = self.ssh_client.exec_command(start_app_cmd) 337 self.log.info(f'Command sent to {self.uxm_ip}: {start_app_cmd}') 338 stdin.close() 339 err = ''.join(stderr.readlines()) 340 out = ''.join(stdout.readlines()) 341 # psexec return process ID as part of the exit code 342 exit_status = stderr.channel.recv_exit_status() 343 is_started = re.search( 344 self.PSEXEC_PROC_STARTED_REGEX_FORMAT.format(proc_id=exit_status), 345 err[-1]) 346 if is_started: 347 raise RuntimeError('Fail to start TA: ' + out + err) 348 # wait for ta completely boot up 349 self.log.info('TA is starting') 350 time.sleep(self.TA_BOOT_TIME) 351 352 def recovery_ta(self): 353 """Start TA if it is not running.""" 354 if not self.is_ta_running(): 355 self._start_test_app() 356 # checking if ta booting process complete 357 # by checking socket connection 358 s = None 359 retries = 12 360 for _ in range(retries): 361 try: 362 s = self._socket_connect(self.uxm_ip, self.UXM_SOCKET_PORT) 363 s.close() 364 return 365 except ConnectionRefusedError as cre: 366 self.log.info( 367 'Connection refused, wait 10s for TA to boot') 368 time.sleep(10) 369 raise RuntimeError('TA does not start on time') 370 371 def set_rockbottom_script_path(self, path): 372 """Set path to rockbottom script. 373 374 Args: 375 path: path to rockbottom script. 376 """ 377 self.rockbottom_script = path 378 379 def set_cell_info(self, cell_info): 380 """Set type and number for multiple cells. 381 382 Args: 383 cell_info: list of dictionaries, 384 each dictionary contain cell type 385 and cell number for each cell 386 that the simulator need to control. 387 """ 388 if not cell_info: 389 raise ValueError('Missing cell info from configurations file') 390 self.cells = cell_info 391 392 def deregister_ue_ims(self): 393 """Remove UE IMS profile from UXM.""" 394 self._socket_send_SCPI_command( 395 self.SCPI_DEREGISTER_UE_IMS) 396 397 def create_dedicated_bearer(self): 398 """Create a dedicated bearer setup for ims call. 399 400 After UE connected and register on UXM IMS tab. 401 It is required to create a dedicated bearer setup 402 with EPS bearer ID 10. 403 """ 404 cell_number = self.cells[0][self._KEY_CELL_NUMBER] 405 self._socket_send_SCPI_command( 406 self.SCPI_CREATE_DEDICATED_BEARER.format(cell_number)) 407 408 def turn_cell_on(self, cell_type, cell_number): 409 """Turn UXM's cell on. 410 411 Args: 412 cell_type: type of cell (e.g NR5G, LTE). 413 cell_number: ordinal number of a cell. 414 """ 415 if cell_type and cell_number: 416 self._socket_send_SCPI_command( 417 self.SCPI_CELL_ON_CMD.format(cell_type, cell_number)) 418 else: 419 raise ValueError('Invalid cell info\n' + 420 f' cell type: {cell_type}\n' + 421 f' cell number: {cell_number}\n') 422 423 def turn_cell_off(self, cell_type, cell_number): 424 """Turn UXM's cell off. 425 426 Args: 427 cell_type: type of cell (e.g NR5G, LTE). 428 cell_number: ordinal number of a cell. 429 """ 430 if cell_type and cell_number: 431 self._socket_send_SCPI_command( 432 self.SCPI_CELL_OFF_CMD.format(cell_type, cell_number)) 433 else: 434 raise ValueError('Invalid cell info\n' + 435 f' cell type: {cell_type}\n' + 436 f' cell number: {cell_number}\n') 437 438 def get_all_cell_status(self): 439 """Gets status of all cells. 440 441 Returns: 442 List of tuples which has values (cell_type, cell_number, cell_status) 443 """ 444 res = [] 445 for cell in self.cells: 446 cell_type = cell[self._KEY_CELL_TYPE] 447 cell_number = cell[self._KEY_CELL_NUMBER] 448 cell_status = self.get_cell_status(cell_type, cell_number) 449 res.append((cell_type, cell_number, cell_status)) 450 return res 451 452 def get_cell_status(self, cell_type, cell_number): 453 """Get status of cell. 454 455 Args: 456 cell_type: type of cell (e.g NR5G, LTE). 457 cell_number: ordinal number of a cell. 458 """ 459 if not cell_type or not cell_number: 460 raise ValueError('Invalid cell with\n' + 461 f' cell type: {cell_type}\n' + 462 f' cell number: {cell_number}\n') 463 464 return self._socket_send_SCPI_for_result_command( 465 self.SCPI_GET_CELL_STATUS.format(cell_type, cell_number)) 466 467 def check_socket_connection(self): 468 """Check if the socket connection is established. 469 470 Query the identification of the Keysight Test Application 471 we are trying to connect to. Empty response indicates 472 connection fail, and vice versa. 473 """ 474 self.socket.sendall(self.SCPI_CHECK_CONNECTION_CMD.encode()) 475 response = self.socket.recv(1024).decode() 476 if response: 477 self.log.info(f'Connected to: {response}') 478 else: 479 self.log.error('Fail to connect to callbox') 480 481 def _socket_connect(self, host, port): 482 """Create socket connection. 483 484 Args: 485 host: IP address of desktop where Keysight Test Application resides. 486 port: port that Keysight Test Application is listening for socket 487 communication. 488 Returns: 489 s: socket object. 490 """ 491 self.log.info('Establishing connection to callbox via socket') 492 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 493 s.connect((host, port)) 494 return s 495 496 def _socket_send_SCPI_command(self, command): 497 """Send SCPI command without expecting response. 498 499 Args: 500 command: a string SCPI command. 501 """ 502 # make sure there is a line break for the socket to send command 503 command = command + '\n' 504 # send command 505 self.socket.sendall(command.encode()) 506 self.log.info(f'Sent {command}') 507 508 def _socket_receive_SCPI_result(self): 509 """Receive response from socket. """ 510 i = 1 511 response = '' 512 while i < self.timeout and not response: 513 response = self.socket.recv(1024).decode() 514 i += 1 515 return response 516 517 def _socket_send_SCPI_for_result_command(self, command): 518 """Send SCPI command and expecting response. 519 520 Args: 521 command: a string SCPI command. 522 """ 523 self._socket_send_SCPI_command(command) 524 response = self._socket_receive_SCPI_result() 525 return response 526 527 def check_system_error(self): 528 """Query system error from Keysight Test Application. 529 530 Returns: 531 status: a message indicate the number of errors 532 and detail of errors if any. 533 a string `0,"No error"` indicates no error. 534 """ 535 status = self._socket_send_SCPI_for_result_command( 536 self.SCPI_SYSTEM_ERROR_CHECK_CMD) 537 self.log.info(f'System error status: {status}') 538 return status 539 540 def import_configuration(self, path): 541 """Import SCPI config file. 542 543 Args: 544 path: path to SCPI file. 545 """ 546 self._socket_send_SCPI_command( 547 self.SCPI_SETTINGS_PRESET_CMD) 548 time.sleep(10) 549 self._socket_send_SCPI_command( 550 self.SCPI_IMPORT_SCPI_FILE_CMD.format(path)) 551 time.sleep(45) 552 553 def destroy(self): 554 """Close socket connection with UXM. """ 555 self.socket.close() 556 557 def setup_lte_scenario(self, path): 558 """Configures the equipment for an LTE simulation. 559 560 Args: 561 path: path to SCPI config file. 562 """ 563 self.import_configuration(path) 564 565 def set_sim_type(self, is_3gpp_sim): 566 sim_type = 'KEYSight' 567 if is_3gpp_sim: 568 sim_type = 'TEST3GPP' 569 self._socket_send_SCPI_command( 570 self.SCPI_CHANGE_SIM_NR_CMD.format(sim_type)) 571 time.sleep(2) 572 self._socket_send_SCPI_command( 573 self.SCPI_CHANGE_SIM_LTE_CMD.format(sim_type)) 574 time.sleep(2) 575 576 def wait_until_attached_one_cell(self, 577 cell_type, 578 cell_number, 579 dut, 580 wait_for_camp_interval, 581 attach_retries): 582 """Wait until connect to given UXM cell. 583 584 After turn off airplane mode, sleep for 585 wait_for_camp_interval seconds for device to camp. 586 If not device is not connected after the wait, 587 either toggle airplane mode on/off or reboot device. 588 Args: 589 cell_type: type of cell 590 which we are trying to connect to. 591 cell_number: ordinal number of a cell 592 which we are trying to connect to. 593 dut: a AndroidCellular controller. 594 wait_for_camp_interval: sleep interval, 595 wait for device to camp. 596 attach_retries: number of retry 597 to wait for device 598 to connect to 1 basestation. 599 Raise: 600 RuntimeError: device unable to connect to cell. 601 """ 602 # airplane mode on 603 dut.toggle_airplane_mode(True) 604 time.sleep(5) 605 606 # turn cell on 607 self.turn_cell_on(cell_type, cell_number) 608 time.sleep(5) 609 610 interval = 10 611 # waits for device to camp 612 for index in range(1, attach_retries+1): 613 count = 0 614 # airplane mode off 615 dut.toggle_airplane_mode(False) 616 time.sleep(5) 617 # check connection in small interval 618 while count < wait_for_camp_interval: 619 time.sleep(interval) 620 cell_state = self.get_cell_status(cell_type, cell_number) 621 self.log.info(f'cell state: {cell_state}') 622 if cell_state == 'CONN\n': 623 # wait for connection stable 624 time.sleep(15) 625 # check connection status again 626 cell_state = self.get_cell_status(cell_type, cell_number) 627 self.log.info(f'cell state: {cell_state}') 628 if cell_state == 'CONN\n': 629 return True 630 if cell_state == 'OFF\n': 631 self.turn_cell_on(cell_type, cell_number) 632 time.sleep(5) 633 count += interval 634 635 # reboot device 636 if (index % 2) == 0: 637 dut.ad.reboot() 638 639 # toggle APM and cell on/off 640 elif (index % 1) == 0: 641 # Toggle APM on 642 dut.toggle_airplane_mode(True) 643 time.sleep(5) 644 645 # Toggle simulator cell 646 self.turn_cell_off(cell_type, cell_number) 647 time.sleep(5) 648 self.turn_cell_on(cell_type, cell_number) 649 time.sleep(5) 650 651 # Toggle APM off 652 dut.toggle_airplane_mode(False) 653 time.sleep(5) 654 # increase length of small waiting interval 655 interval += 5 656 657 # Phone cannot connected to basestation of callbox 658 raise RuntimeError( 659 f'Phone was unable to connect to cell: {cell_type}-{cell_number}') 660 661 def wait_until_attached(self, dut, timeout, attach_retries): 662 """Waits until the DUT is attached to all required cells. 663 664 Args: 665 dut: a CellularAndroid controller. 666 timeout: sleep interval, 667 wait for device to camp in 1 try. 668 attach_retries: number of retry 669 to wait for device 670 to connect to 1 basestation. 671 """ 672 # get cell info 673 first_cell_type = self.cells[0][self._KEY_CELL_TYPE] 674 first_cell_number = self.cells[0][self._KEY_CELL_NUMBER] 675 if len(self.cells) == 2: 676 second_cell_type = self.cells[1][self._KEY_CELL_TYPE] 677 second_cell_number = self.cells[1][self._KEY_CELL_NUMBER] 678 679 # connect to 1st cell 680 self.wait_until_attached_one_cell(first_cell_type, 681 first_cell_number, dut, timeout, 682 attach_retries) 683 684 # aggregation to NR 685 if len(self.cells) == 2: 686 self.turn_cell_on( 687 second_cell_type, 688 second_cell_number, 689 ) 690 691 for _ in range(1, attach_retries+1): 692 self.log.info('Try to aggregate to NR.') 693 self._socket_send_SCPI_command( 694 'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:DL None') 695 self._socket_send_SCPI_command( 696 'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:UL None') 697 self._socket_send_SCPI_command( 698 'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:DL CELL1') 699 self._socket_send_SCPI_command( 700 'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:DL CELL1') 701 time.sleep(1) 702 self._socket_send_SCPI_command( 703 "BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:APPly") 704 # wait for status stable 705 time.sleep(10) 706 cell_state = self.get_cell_status(second_cell_type, second_cell_number) 707 self.log.info(f'cell state: {cell_state}') 708 if cell_state == 'CONN\n': 709 return 710 else: 711 self.turn_cell_off(second_cell_type, second_cell_number) 712 # wait for LTE cell to connect again 713 self.wait_until_attached_one_cell(first_cell_type, 714 first_cell_number, dut, 120, 715 2) 716 717 raise RuntimeError(f'Fail to aggregate to NR from LTE.') 718 719 def modify_dl_ul_mac_padding(self): 720 """Disables dl/ul mac padding packets.""" 721 self.log.info('modifying dl ul mac padding') 722 self._socket_send_SCPI_command(self._SCPI_CHANGE_DL_TDOMAIN) 723 self._socket_send_SCPI_command(self._SCPI_CHANGE_UL_TDOMAIN) 724 725 def set_lte_rrc_state_change_timer(self, enabled, time=10): 726 """Configures the LTE RRC state change timer. 727 728 Args: 729 enabled: a boolean indicating if the timer should be on or off. 730 time: time in seconds for the timer to expire. 731 """ 732 raise NotImplementedError( 733 'This UXM callbox simulator does not support this feature.') 734 735 def set_band(self, bts_index, band): 736 """Sets the band for the indicated base station. 737 738 Args: 739 bts_index: the base station number. 740 band: the new band. 741 """ 742 raise NotImplementedError( 743 'This UXM callbox simulator does not support this feature.') 744 745 def get_duplex_mode(self, band): 746 """Determines if the band uses FDD or TDD duplex mode 747 748 Args: 749 band: a band number. 750 751 Returns: 752 an variable of class DuplexMode indicating if band is FDD or TDD. 753 """ 754 raise NotImplementedError( 755 'This UXM callbox simulator does not support this feature.') 756 757 def set_input_power(self, bts_index, input_power): 758 """Sets the input power for the indicated base station. 759 760 Args: 761 bts_index: the base station number. 762 input_power: the new input power. 763 """ 764 raise NotImplementedError( 765 'This UXM callbox simulator does not support this feature.') 766 767 def set_output_power(self, bts_index, output_power): 768 """Sets the output power for the indicated base station. 769 770 Args: 771 bts_index: the base station number. 772 output_power: the new output power. 773 """ 774 raise NotImplementedError( 775 'This UXM callbox simulator does not support this feature.') 776 777 def set_tdd_config(self, bts_index, tdd_config): 778 """Sets the tdd configuration number for the indicated base station. 779 780 Args: 781 bts_index: the base station number. 782 tdd_config: the new tdd configuration number. 783 """ 784 raise NotImplementedError( 785 'This UXM callbox simulator does not support this feature.') 786 787 def set_ssf_config(self, bts_index, ssf_config): 788 """Sets the Special Sub-Frame config number for the indicated. 789 790 base station. 791 792 Args: 793 bts_index: the base station number. 794 ssf_config: the new ssf config number. 795 """ 796 raise NotImplementedError( 797 'This UXM callbox simulator does not support this feature.') 798 799 def set_bandwidth(self, bts_index, bandwidth): 800 """Sets the bandwidth for the indicated base station. 801 802 Args: 803 bts_index: the base station number 804 bandwidth: the new bandwidth 805 """ 806 raise NotImplementedError( 807 'This UXM callbox simulator does not support this feature.') 808 809 def set_downlink_channel_number(self, bts_index, channel_number): 810 """Sets the downlink channel number for the indicated base station. 811 812 Args: 813 bts_index: the base station number. 814 channel_number: the new channel number. 815 """ 816 raise NotImplementedError( 817 'This UXM callbox simulator does not support this feature.') 818 819 def set_mimo_mode(self, bts_index, mimo_mode): 820 """Sets the mimo mode for the indicated base station. 821 822 Args: 823 bts_index: the base station number 824 mimo_mode: the new mimo mode 825 """ 826 raise NotImplementedError( 827 'This UXM callbox simulator does not support this feature.') 828 829 def set_transmission_mode(self, bts_index, tmode): 830 """Sets the transmission mode for the indicated base station. 831 832 Args: 833 bts_index: the base station number. 834 tmode: the new transmission mode. 835 """ 836 raise NotImplementedError( 837 'This UXM callbox simulator does not support this feature.') 838 839 def set_scheduling_mode(self, 840 bts_index, 841 scheduling, 842 mcs_dl=None, 843 mcs_ul=None, 844 nrb_dl=None, 845 nrb_ul=None): 846 """Sets the scheduling mode for the indicated base station. 847 848 Args: 849 bts_index: the base station number. 850 scheduling: the new scheduling mode. 851 mcs_dl: Downlink MCS. 852 mcs_ul: Uplink MCS. 853 nrb_dl: Number of RBs for downlink. 854 nrb_ul: Number of RBs for uplink. 855 """ 856 raise NotImplementedError( 857 'This UXM callbox simulator does not support this feature.') 858 859 def set_dl_256_qam_enabled(self, bts_index, enabled): 860 """Determines what MCS table should be used for the downlink. 861 862 This only saves the setting that will be used when configuring MCS. 863 864 Args: 865 bts_index: the base station number. 866 enabled: whether 256 QAM should be used. 867 """ 868 raise NotImplementedError( 869 'This UXM callbox simulator does not support this feature.') 870 871 def set_ul_64_qam_enabled(self, bts_index, enabled): 872 """Determines what MCS table should be used for the uplink. 873 874 This only saves the setting that will be used when configuring MCS. 875 876 Args: 877 bts_index: the base station number. 878 enabled: whether 64 QAM should be used. 879 """ 880 raise NotImplementedError( 881 'This UXM callbox simulator does not support this feature.') 882 883 def set_mac_padding(self, bts_index, mac_padding): 884 """Enables or disables MAC padding in the indicated base station. 885 886 Args: 887 bts_index: the base station number. 888 mac_padding: the new MAC padding setting. 889 """ 890 raise NotImplementedError( 891 'This UXM callbox simulator does not support this feature.') 892 893 def set_cfi(self, bts_index, cfi): 894 """Sets the Channel Format Indicator for the indicated base station. 895 896 Args: 897 bts_index: the base station number. 898 cfi: the new CFI setting. 899 """ 900 raise NotImplementedError( 901 'This UXM callbox simulator does not support this feature.') 902 903 def set_paging_cycle(self, bts_index, cycle_duration): 904 """Sets the paging cycle duration for the indicated base station. 905 906 Args: 907 bts_index: the base station number. 908 cycle_duration: the new paging cycle duration in milliseconds. 909 """ 910 raise NotImplementedError( 911 'This UXM callbox simulator does not support this feature.') 912 913 def set_phich_resource(self, bts_index, phich): 914 """Sets the PHICH Resource setting for the indicated base station. 915 916 Args: 917 bts_index: the base station number. 918 phich: the new PHICH resource setting. 919 """ 920 raise NotImplementedError( 921 'This UXM callbox simulator does not support this feature.') 922 923 def lte_attach_secondary_carriers(self, ue_capability_enquiry): 924 """Activates the secondary carriers for CA. 925 926 Requires the DUT to be attached to the primary carrier first. 927 928 Args: 929 ue_capability_enquiry: UE capability enquiry message to be sent to 930 the UE before starting carrier aggregation. 931 """ 932 raise NotImplementedError( 933 'This UXM callbox simulator does not support this feature.') 934 935 def wait_until_communication_state(self, timeout=120): 936 """Waits until the DUT is in Communication state. 937 938 Args: 939 timeout: after this amount of time the method will raise 940 a CellularSimulatorError exception. Default is 120 seconds. 941 """ 942 raise NotImplementedError( 943 'This UXM callbox simulator does not support this feature.') 944 945 def wait_until_idle_state(self, timeout=120): 946 """Waits until the DUT is in Idle state. 947 948 Args: 949 timeout: after this amount of time the method will raise a 950 CellularSimulatorError exception. Default is 120 seconds. 951 """ 952 # turn on RRC release 953 cell_type = self.cells[0][self._KEY_CELL_TYPE] 954 cell_number = self.cells[0][self._KEY_CELL_NUMBER] 955 956 # choose cmd base on cell type 957 cmd = None 958 if cell_type == 'LTE': 959 cmd = self.SCPI_RRC_RELEASE_LTE_CMD 960 else: 961 cmd = self.SCPI_RRC_RELEASE_NR_CMD 962 963 if not cmd: 964 raise RuntimeError(f'Cell type [{cell_type}] is not supporting IDLE.') 965 966 # checking status 967 self.log.info('Wait for IDLE state.') 968 for _ in range(5): 969 cell_state = self.get_cell_status(cell_type, cell_number) 970 self.log.info(f'cell state: {cell_state}') 971 if cell_state == 'CONN\n': 972 # RRC release 973 self._socket_send_SCPI_command(cmd.format(cell_type, cell_number)) 974 # wait for status stable 975 time.sleep(60) 976 elif cell_state == 'IDLE\n': 977 return 978 979 raise RuntimeError('RRC release fail.') 980 981 def detach(self): 982 """ Turns off all the base stations so the DUT loose connection.""" 983 for cell in self.cells: 984 cell_type = cell[self._KEY_CELL_TYPE] 985 cell_number = cell[self._KEY_CELL_NUMBER] 986 self.turn_cell_off(cell_type, cell_number) 987 time.sleep(5) 988 989 def stop(self): 990 """Stops current simulation. 991 992 After calling this method, the simulator will need to be set up again. 993 """ 994 raise NotImplementedError( 995 'This UXM callbox simulator does not support this feature.') 996 997 def start_data_traffic(self): 998 """Starts transmitting data from the instrument to the DUT. """ 999 raise NotImplementedError( 1000 'This UXM callbox simulator does not support this feature.') 1001 1002 def stop_data_traffic(self): 1003 """Stops transmitting data from the instrument to the DUT. """ 1004 raise NotImplementedError( 1005 'This UXM callbox simulator does not support this feature.') 1006