• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#   Copyright 2022 - The Android Open Source Project
2#
3#   Licensed under the Apache License, Version 2.0 (the 'License');
4#   you may not use this file except in compliance with the License.
5#   You may obtain a copy of the License at
6#
7#       http://www.apache.org/licenses/LICENSE-2.0
8#
9#   Unless required by applicable law or agreed to in writing, software
10#   distributed under the License is distributed on an 'AS IS' BASIS,
11#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12#   See the License for the specific language governing permissions and
13#   limitations under the License.
14import re
15import logging
16import os
17import paramiko
18import socket
19import time
20
21from acts.controllers.cellular_simulator import AbstractCellularSimulator
22
23class SocketWrapper():
24    """A wrapper for socket communicate with test equipment.
25
26    Attributes:
27        _socket: a socket object.
28        _ip: a string value for ip address
29            which we want to connect.
30        _port: an integer for port
31            which we want to connect.
32        _connecting_timeout: an integer for socket connecting timeout.
33        _encode_format: a string specify encoding format.
34        _cmd_terminator: a character indicates the end of command/data
35            which need to be sent.
36    """
37
38    def __init__(self, ip, port,
39                 connecting_timeout=120,
40                 cmd_terminator='\n',
41                 encode_format='utf-8',
42                 buff_size=1024):
43        self._socket = None
44        self._ip = ip
45        self._port = port
46        self._connecting_timeout = connecting_timeout
47        self._cmd_terminator = cmd_terminator
48        self._encode_format = encode_format
49        self._buff_size = buff_size
50        self._logger = logging.getLogger(__name__)
51
52    def _connect(self):
53        self._socket = socket.create_connection(
54            (self._ip, self._port), timeout=self._connecting_timeout
55        )
56
57    def send_command(self, cmd: str):
58        if not self._socket:
59            self._connect()
60        if cmd and cmd[-1] != self._cmd_terminator:
61            cmd = cmd + self._cmd_terminator
62        self._socket.sendall(cmd.encode(self._encode_format))
63
64    def send_command_recv(self, cmd: str) -> str:
65        """Send data and wait for response
66
67        Args:
68            cmd: a string command to be sent.
69
70        Returns:
71            a string response.
72        """
73        self.send_command(cmd)
74        response = ''
75        try:
76            response = self._socket.recv(self._buff_size).decode(
77                self._encode_format
78            )
79        except socket.timeout as e:
80            self._logger.info('Socket timeout while receiving response.')
81            self.close()
82            raise
83
84        return response
85
86    def close(self):
87        self._socket.close()
88        self._socket = None
89
90class UXMCellularSimulator(AbstractCellularSimulator):
91    """A cellular simulator for UXM callbox."""
92
93    # Keys to obtain data from cell_info dictionary.
94    _KEY_CELL_NUMBER = "cell_number"
95    _KEY_CELL_TYPE = "cell_type"
96
97    # UXM socket port
98    UXM_SOCKET_PORT = 5125
99
100    # UXM SCPI COMMAND
101    SCPI_IMPORT_STATUS_QUERY_CMD = 'SYSTem:SCPI:IMPort:STATus?'
102    SCPI_SYSTEM_ERROR_CHECK_CMD = 'SYST:ERR?\n'
103    SCPI_CHECK_CONNECTION_CMD = '*IDN?\n'
104    SCPI_DEREGISTER_UE_IMS = 'SYSTem:IMS:SERVer:UE:DERegister'
105    _SCPI_CHANGE_DL_TDOMAIN = 'BSE:CONFig:NR5G:CELL1:SCHeduling:BWP0:FC0:SC0:DL:TDOMain:APOLicy ONMac'
106    _SCPI_CHANGE_UL_TDOMAIN = 'BSE:CONFig:NR5G:CELL1:SCHeduling:BWP0:FC0:SC0:UL:NUL:TDOMain:APOLicy ONSRbsr'
107    # require: path to SCPI file
108    SCPI_IMPORT_SCPI_FILE_CMD = 'SYSTem:SCPI:IMPort "{}"\n'
109    # require: 1. cell type (E.g. NR5G), 2. cell number (E.g CELL1)
110    SCPI_CELL_ON_CMD = 'BSE:CONFig:{}:{}:ACTive 1'
111    SCPI_CELL_OFF_CMD = 'BSE:CONFig:{}:{}:ACTive 0'
112    SCPI_GET_CELL_STATUS = 'BSE:STATus:{}:{}?'
113    SCPI_RRC_RELEASE_LTE_CMD = 'BSE:FUNCtion:{}:{}:RELease:SEND'
114    SCPI_RRC_RELEASE_NR_CMD = 'BSE:CONFig:{}:{}:RCONtrol:RRC:STARt RRELease'
115    # require cell number
116    SCPI_CREATE_DEDICATED_BEARER = 'BSE:FUNCtion:LTE:{}:NAS:EBID10:DEDicated:CREate'
117    SCPI_CHANGE_SIM_NR_CMD = 'BSE:CONFig:NR5G:CELL1:SECurity:AUTHenticate:KEY:TYPE {}'
118    SCPI_CHANGE_SIM_LTE_CMD = 'BSE:CONFig:LTE:SECurity:AUTHenticate:KEY {}'
119    SCPI_SETTINGS_PRESET_CMD = 'SYSTem:PRESet:FULL'
120
121    # UXM's Test Application recovery
122    TA_BOOT_TIME = 100
123
124    # shh command
125    SSH_START_GUI_APP_CMD_FORMAT = 'psexec -s -d -i 1 "{exe_path}"'
126    SSH_CHECK_APP_RUNNING_CMD_FORMAT = 'tasklist | findstr /R {regex_app_name}'
127    SSH_KILL_PROCESS_BY_NAME = 'taskkill /IM {process_name} /F'
128    UXM_TEST_APP_NAME = 'TestApp.exe'
129
130    # start process success regex
131    PSEXEC_PROC_STARTED_REGEX_FORMAT = 'started on * with process ID {proc_id}'
132
133    # HCCU default value
134    HCCU_SOCKET_PORT = 4882
135    # number of digit of the length of setup name
136    HCCU_SCPI_CHANGE_SETUP_CMD = ':SYSTem:SETup:CONFig #{number_of_digit}{setup_name_len}{setup_name}'
137    HCCU_SCPI_CHANGE_SCENARIO_CMD = ':SETup:SCENe "((NE_1, {scenario_name}))"'
138    HCCU_STATUS_CHECK_CMD = ':SETup:INSTrument:STATus? 0\n'
139    HCCU_FR2_SETUP_NAME = '{Name:"TSPC_1UXM5G_HF_2RRH_M1740A"}'
140    HCCU_FR1_SETUP_NAME = '{Name:"TSPC_1UXM5G_LF"}'
141    HCCU_GET_INSTRUMENT_COUNT_CMD = ':SETup:INSTrument:COUNt?'
142    HCCU_FR2_INSTRUMENT_COUNT = 5
143    HCCU_FR1_INSTRUMENT_COUNT = 2
144    HCCU_FR2_SCENARIO = 'NR_4DL2x2_2UL2x2_LTE_4CC'
145    HCCU_FR1_SCENARIO = 'NR_1DL4x4_1UL2x2_LTE_4CC'
146
147
148    def __init__(self, ip_address, custom_files,uxm_user,
149                 ssh_private_key_to_uxm, ta_exe_path, ta_exe_name):
150        """Initializes the cellular simulator.
151
152        Args:
153            ip_address: the ip address of host where Keysight Test Application (TA)
154                is installed.
155            custom_files: a list of file path for custom files.
156            uxm_user: username of host where Keysight TA resides.
157            ssh_private_key_to_uxm: private key for key based ssh to
158                host where Keysight TA resides.
159            ta_exe_path: path to TA exe.
160            ta_exe_name: name of TA exe.
161        """
162        super().__init__()
163        self.custom_files = custom_files
164        self.rockbottom_script = None
165        self.cells = []
166        self.uxm_ip = ip_address
167        self.uxm_user = uxm_user
168        self.ssh_private_key_to_uxm = os.path.expanduser(
169                                        ssh_private_key_to_uxm)
170        self.ta_exe_path = ta_exe_path
171        self.ta_exe_name = ta_exe_name
172        self.ssh_client = self.create_ssh_client()
173
174        # get roclbottom file
175        for file in self.custom_files:
176            if 'rockbottom_' in file:
177                self.rockbottom_script = file
178
179        # connect to Keysight Test Application via socket
180        self.recovery_ta()
181        self.socket = self._socket_connect(self.uxm_ip, self.UXM_SOCKET_PORT)
182        self.check_socket_connection()
183        self.timeout = 120
184
185        # hccu socket
186        self.hccu_socket_port = self.HCCU_SOCKET_PORT
187        self.hccu_socket = SocketWrapper(self.uxm_ip, self.hccu_socket_port)
188
189    def socket_connect(self):
190        self.socket = self._socket_connect(self.uxm_ip, self.UXM_SOCKET_PORT)
191
192    def switch_HCCU_scenario(self, scenario_name: str):
193        cmd = self.HCCU_SCPI_CHANGE_SCENARIO_CMD.format(
194            scenario_name=scenario_name)
195        self.hccu_socket.send_command(cmd)
196        self.log.debug(f'Sent command: {cmd}')
197        # this is require for the command to take effect
198        # because hccu's port need to be free.
199        self.hccu_socket.close()
200
201    def switch_HCCU_setup(self, setup_name: str):
202        """Change HHCU system setup.
203
204        Args:
205            setup_name: a string name
206                of the system setup will be changed to.
207        """
208        setup_name_len = str(len(setup_name))
209        number_of_digit = str(len(setup_name_len))
210        cmd = self.HCCU_SCPI_CHANGE_SETUP_CMD.format(
211            number_of_digit=number_of_digit,
212            setup_name_len=setup_name_len,
213            setup_name=setup_name
214        )
215        self.hccu_socket.send_command(cmd)
216        self.log.debug(f'Sent command: {cmd}')
217        # this is require for the command to take effect
218        # because hccu's port need to be free.
219        self.hccu_socket.close()
220
221    def wait_until_hccu_operational(self, timeout=1200):
222        """ Wait for hccu is ready to operate for a specified timeout.
223
224        Args:
225            timeout: time we are waiting for
226                hccu in opertional status.
227
228        Returns:
229            True if HCCU status is operational within timeout.
230            False otherwise.
231        """
232        # check status
233        self.log.info('Waiting for HCCU to ready to operate.')
234        cmd = self.HCCU_STATUS_CHECK_CMD
235        t = 0
236        interval = 10
237        while t < timeout:
238            response = self.hccu_socket.send_command_recv(cmd)
239            if response == 'OPER\n':
240                return True
241            time.sleep(interval)
242            t += interval
243        return False
244
245    def switch_HCCU_settings(self, is_fr2: bool):
246        """Set HCCU setup configuration.
247
248        HCCU stands for Hardware Configuration Control Utility,
249        an interface allows us to control Keysight Test Equipment.
250
251        Args:
252            is_fr2: a bool value.
253        """
254        # change HCCU configration
255        data = ''
256        scenario_name = ''
257        instrument_count_res = self.hccu_socket.send_command_recv(
258            self.HCCU_GET_INSTRUMENT_COUNT_CMD)
259        instrument_count = int(instrument_count_res)
260        # if hccu setup is correct, no need to change.
261        if is_fr2 and instrument_count == self.HCCU_FR2_INSTRUMENT_COUNT:
262            self.log.info('UXM has correct HCCU setup.')
263            return
264        if not is_fr2 and instrument_count == self.HCCU_FR1_INSTRUMENT_COUNT:
265            self.log.info('UXM has correct HCCU setup.')
266            return
267
268        self.log.info('UXM has incorrect HCCU setup, start changing setup.')
269        # terminate TA and close socket
270        self.log.info('Terminate TA before switch HCCU settings.')
271        self.terminate_process(self.UXM_TEST_APP_NAME)
272        self.socket.close()
273
274        # change hccu setup
275        if is_fr2:
276            data = self.HCCU_FR2_SETUP_NAME
277            scenario_name = self.HCCU_FR2_SCENARIO
278        else:
279            data = self.HCCU_FR1_SETUP_NAME
280            scenario_name = self.HCCU_FR1_SCENARIO
281        self.log.info('Switch HCCU setup.')
282        self.switch_HCCU_setup(data)
283        time.sleep(10)
284        if not self.wait_until_hccu_operational():
285            raise RuntimeError('Fail to switch HCCU setup.')
286
287        # change scenario
288        self.log.info('Ativate HCCU scenario.')
289        self.switch_HCCU_scenario(scenario_name)
290        time.sleep(40)
291        if not self.wait_until_hccu_operational():
292            raise RuntimeError('Fail to switch HCCU scenario.')
293
294        # start TA and reconnect socket.
295        self.recovery_ta()
296        self.socket = self._socket_connect(self.uxm_ip, self.UXM_SOCKET_PORT)
297
298    def create_ssh_client(self):
299        """Create a ssh client to host."""
300        ssh = paramiko.SSHClient()
301        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
302        mykey = paramiko.Ed25519Key.from_private_key_file(
303            self.ssh_private_key_to_uxm)
304        ssh.connect(hostname=self.uxm_ip, username=self.uxm_user, pkey=mykey)
305        self.log.info('SSH client to %s is connected' % self.uxm_ip)
306        return ssh
307
308    def terminate_process(self, process_name):
309        cmd = self.SSH_KILL_PROCESS_BY_NAME.format(
310            process_name=process_name
311        )
312        stdin, stdout, stderr = self.ssh_client.exec_command(cmd)
313        stdin.close()
314        err = ''.join(stderr.readlines())
315        out = ''.join(stdout.readlines())
316        final_output = str(out) + str(err)
317        self.log.info(final_output)
318        return out
319
320    def is_ta_running(self):
321        is_running_cmd = self.SSH_CHECK_APP_RUNNING_CMD_FORMAT.format(
322            regex_app_name=self.ta_exe_name)
323        stdin, stdout, stderr = self.ssh_client.exec_command(is_running_cmd)
324        stdin.close()
325        err = ''.join(stderr.readlines())
326        out = ''.join(stdout.readlines())
327        final_output = str(out) + str(err)
328        self.log.info(final_output)
329        return (out != '' and err == '')
330
331    def _start_test_app(self):
332        """Start Test Application on Windows."""
333        # start GUI exe via ssh
334        start_app_cmd = self.SSH_START_GUI_APP_CMD_FORMAT.format(
335            exe_path=self.ta_exe_path)
336        stdin, stdout, stderr = self.ssh_client.exec_command(start_app_cmd)
337        self.log.info(f'Command sent to {self.uxm_ip}: {start_app_cmd}')
338        stdin.close()
339        err = ''.join(stderr.readlines())
340        out = ''.join(stdout.readlines())
341        # psexec return process ID as part of the exit code
342        exit_status = stderr.channel.recv_exit_status()
343        is_started = re.search(
344            self.PSEXEC_PROC_STARTED_REGEX_FORMAT.format(proc_id=exit_status),
345            err[-1])
346        if is_started:
347            raise RuntimeError('Fail to start TA: ' + out + err)
348        # wait for ta completely boot up
349        self.log.info('TA is starting')
350        time.sleep(self.TA_BOOT_TIME)
351
352    def recovery_ta(self):
353        """Start TA if it is not running."""
354        if not self.is_ta_running():
355            self._start_test_app()
356            # checking if ta booting process complete
357            # by checking socket connection
358            s = None
359            retries = 12
360            for _ in range(retries):
361                try:
362                    s = self._socket_connect(self.uxm_ip, self.UXM_SOCKET_PORT)
363                    s.close()
364                    return
365                except ConnectionRefusedError as cre:
366                    self.log.info(
367                        'Connection refused, wait 10s for TA to boot')
368                    time.sleep(10)
369            raise RuntimeError('TA does not start on time')
370
371    def set_rockbottom_script_path(self, path):
372        """Set path to rockbottom script.
373
374        Args:
375            path: path to rockbottom script.
376        """
377        self.rockbottom_script = path
378
379    def set_cell_info(self, cell_info):
380        """Set type and number for multiple cells.
381
382        Args:
383            cell_info: list of dictionaries,
384                each dictionary contain cell type
385                and cell number for each cell
386                that the simulator need to control.
387        """
388        if not cell_info:
389            raise ValueError('Missing cell info from configurations file')
390        self.cells = cell_info
391
392    def deregister_ue_ims(self):
393        """Remove UE IMS profile from UXM."""
394        self._socket_send_SCPI_command(
395                self.SCPI_DEREGISTER_UE_IMS)
396
397    def create_dedicated_bearer(self):
398        """Create a dedicated bearer setup for ims call.
399
400        After UE connected and register on UXM IMS tab.
401        It is required to create a dedicated bearer setup
402        with EPS bearer ID 10.
403        """
404        cell_number = self.cells[0][self._KEY_CELL_NUMBER]
405        self._socket_send_SCPI_command(
406                self.SCPI_CREATE_DEDICATED_BEARER.format(cell_number))
407
408    def turn_cell_on(self, cell_type, cell_number):
409        """Turn UXM's cell on.
410
411        Args:
412            cell_type: type of cell (e.g NR5G, LTE).
413            cell_number: ordinal number of a cell.
414        """
415        if cell_type and cell_number:
416            self._socket_send_SCPI_command(
417                self.SCPI_CELL_ON_CMD.format(cell_type, cell_number))
418        else:
419            raise ValueError('Invalid cell info\n' +
420                             f' cell type: {cell_type}\n' +
421                             f' cell number: {cell_number}\n')
422
423    def turn_cell_off(self, cell_type, cell_number):
424        """Turn UXM's cell off.
425
426        Args:
427            cell_type: type of cell (e.g NR5G, LTE).
428            cell_number: ordinal number of a cell.
429        """
430        if cell_type and cell_number:
431            self._socket_send_SCPI_command(
432                self.SCPI_CELL_OFF_CMD.format(cell_type, cell_number))
433        else:
434            raise ValueError('Invalid cell info\n' +
435                             f' cell type: {cell_type}\n' +
436                             f' cell number: {cell_number}\n')
437
438    def get_all_cell_status(self):
439        """Gets status of all cells.
440
441        Returns:
442        List of tuples which has values (cell_type, cell_number, cell_status)
443        """
444        res = []
445        for cell in self.cells:
446            cell_type = cell[self._KEY_CELL_TYPE]
447            cell_number = cell[self._KEY_CELL_NUMBER]
448            cell_status = self.get_cell_status(cell_type, cell_number)
449            res.append((cell_type, cell_number, cell_status))
450        return res
451
452    def get_cell_status(self, cell_type, cell_number):
453        """Get status of cell.
454
455        Args:
456            cell_type: type of cell (e.g NR5G, LTE).
457            cell_number: ordinal number of a cell.
458        """
459        if not cell_type or not cell_number:
460            raise ValueError('Invalid cell with\n' +
461                             f' cell type: {cell_type}\n' +
462                             f' cell number: {cell_number}\n')
463
464        return self._socket_send_SCPI_for_result_command(
465            self.SCPI_GET_CELL_STATUS.format(cell_type, cell_number))
466
467    def check_socket_connection(self):
468        """Check if the socket connection is established.
469
470        Query the identification of the Keysight Test Application
471        we are trying to connect to. Empty response indicates
472        connection fail, and vice versa.
473        """
474        self.socket.sendall(self.SCPI_CHECK_CONNECTION_CMD.encode())
475        response = self.socket.recv(1024).decode()
476        if response:
477            self.log.info(f'Connected to: {response}')
478        else:
479            self.log.error('Fail to connect to callbox')
480
481    def _socket_connect(self, host, port):
482        """Create socket connection.
483
484        Args:
485            host: IP address of desktop where Keysight Test Application resides.
486            port: port that Keysight Test Application is listening for socket
487                communication.
488        Returns:
489            s: socket object.
490        """
491        self.log.info('Establishing connection to callbox via socket')
492        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
493        s.connect((host, port))
494        return s
495
496    def _socket_send_SCPI_command(self, command):
497        """Send SCPI command without expecting response.
498
499        Args:
500            command: a string SCPI command.
501        """
502        # make sure there is a line break for the socket to send command
503        command = command + '\n'
504        # send command
505        self.socket.sendall(command.encode())
506        self.log.info(f'Sent {command}')
507
508    def _socket_receive_SCPI_result(self):
509        """Receive response from socket. """
510        i = 1
511        response = ''
512        while i < self.timeout and not response:
513            response = self.socket.recv(1024).decode()
514            i += 1
515        return response
516
517    def _socket_send_SCPI_for_result_command(self, command):
518        """Send SCPI command and expecting response.
519
520        Args:
521            command: a string SCPI command.
522        """
523        self._socket_send_SCPI_command(command)
524        response = self._socket_receive_SCPI_result()
525        return response
526
527    def check_system_error(self):
528        """Query system error from Keysight Test Application.
529
530        Returns:
531            status: a message indicate the number of errors
532                and detail of errors if any.
533                a string `0,"No error"` indicates no error.
534        """
535        status = self._socket_send_SCPI_for_result_command(
536            self.SCPI_SYSTEM_ERROR_CHECK_CMD)
537        self.log.info(f'System error status: {status}')
538        return status
539
540    def import_configuration(self, path):
541        """Import SCPI config file.
542
543        Args:
544            path: path to SCPI file.
545        """
546        self._socket_send_SCPI_command(
547            self.SCPI_SETTINGS_PRESET_CMD)
548        time.sleep(10)
549        self._socket_send_SCPI_command(
550            self.SCPI_IMPORT_SCPI_FILE_CMD.format(path))
551        time.sleep(45)
552
553    def destroy(self):
554        """Close socket connection with UXM. """
555        self.socket.close()
556
557    def setup_lte_scenario(self, path):
558        """Configures the equipment for an LTE simulation.
559
560        Args:
561            path: path to SCPI config file.
562        """
563        self.import_configuration(path)
564
565    def set_sim_type(self, is_3gpp_sim):
566        sim_type = 'KEYSight'
567        if is_3gpp_sim:
568            sim_type = 'TEST3GPP'
569        self._socket_send_SCPI_command(
570            self.SCPI_CHANGE_SIM_NR_CMD.format(sim_type))
571        time.sleep(2)
572        self._socket_send_SCPI_command(
573            self.SCPI_CHANGE_SIM_LTE_CMD.format(sim_type))
574        time.sleep(2)
575
576    def wait_until_attached_one_cell(self,
577                                     cell_type,
578                                     cell_number,
579                                     dut,
580                                     wait_for_camp_interval,
581                                     attach_retries):
582        """Wait until connect to given UXM cell.
583
584        After turn off airplane mode, sleep for
585        wait_for_camp_interval seconds for device to camp.
586        If not device is not connected after the wait,
587        either toggle airplane mode on/off or reboot device.
588        Args:
589            cell_type: type of cell
590                which we are trying to connect to.
591            cell_number: ordinal number of a cell
592                which we are trying to connect to.
593            dut: a AndroidCellular controller.
594            wait_for_camp_interval: sleep interval,
595                wait for device to camp.
596            attach_retries: number of retry
597                to wait for device
598                to connect to 1 basestation.
599        Raise:
600            RuntimeError: device unable to connect to cell.
601        """
602        # airplane mode on
603        dut.toggle_airplane_mode(True)
604        time.sleep(5)
605
606        # turn cell on
607        self.turn_cell_on(cell_type, cell_number)
608        time.sleep(5)
609
610        interval = 10
611        # waits for device to camp
612        for index in range(1, attach_retries+1):
613            count = 0
614            # airplane mode off
615            dut.toggle_airplane_mode(False)
616            time.sleep(5)
617            # check connection in small interval
618            while count < wait_for_camp_interval:
619                time.sleep(interval)
620                cell_state = self.get_cell_status(cell_type, cell_number)
621                self.log.info(f'cell state: {cell_state}')
622                if cell_state == 'CONN\n':
623                    # wait for connection stable
624                    time.sleep(15)
625                    # check connection status again
626                    cell_state = self.get_cell_status(cell_type, cell_number)
627                    self.log.info(f'cell state: {cell_state}')
628                    if cell_state == 'CONN\n':
629                        return True
630                if cell_state == 'OFF\n':
631                    self.turn_cell_on(cell_type, cell_number)
632                    time.sleep(5)
633                count += interval
634
635            # reboot device
636            if (index % 2) == 0:
637                dut.ad.reboot()
638
639            # toggle APM and cell on/off
640            elif (index % 1) == 0:
641                # Toggle APM on
642                dut.toggle_airplane_mode(True)
643                time.sleep(5)
644
645                # Toggle simulator cell
646                self.turn_cell_off(cell_type, cell_number)
647                time.sleep(5)
648                self.turn_cell_on(cell_type, cell_number)
649                time.sleep(5)
650
651                # Toggle APM off
652                dut.toggle_airplane_mode(False)
653                time.sleep(5)
654            # increase length of small waiting interval
655            interval += 5
656
657        # Phone cannot connected to basestation of callbox
658        raise RuntimeError(
659            f'Phone was unable to connect to cell: {cell_type}-{cell_number}')
660
661    def wait_until_attached(self, dut, timeout, attach_retries):
662        """Waits until the DUT is attached to all required cells.
663
664        Args:
665            dut: a CellularAndroid controller.
666            timeout: sleep interval,
667                wait for device to camp in 1 try.
668            attach_retries: number of retry
669                to wait for device
670                to connect to 1 basestation.
671        """
672        # get cell info
673        first_cell_type = self.cells[0][self._KEY_CELL_TYPE]
674        first_cell_number = self.cells[0][self._KEY_CELL_NUMBER]
675        if len(self.cells) == 2:
676            second_cell_type = self.cells[1][self._KEY_CELL_TYPE]
677            second_cell_number = self.cells[1][self._KEY_CELL_NUMBER]
678
679        # connect to 1st cell
680        self.wait_until_attached_one_cell(first_cell_type,
681                                          first_cell_number, dut, timeout,
682                                          attach_retries)
683
684        # aggregation to NR
685        if len(self.cells) == 2:
686            self.turn_cell_on(
687                second_cell_type,
688                second_cell_number,
689            )
690
691            for _ in range(1, attach_retries+1):
692                self.log.info('Try to aggregate to NR.')
693                self._socket_send_SCPI_command(
694                    'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:DL None')
695                self._socket_send_SCPI_command(
696                    'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:UL None')
697                self._socket_send_SCPI_command(
698                    'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:DL CELL1')
699                self._socket_send_SCPI_command(
700                    'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:DL CELL1')
701                time.sleep(1)
702                self._socket_send_SCPI_command(
703                    "BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:APPly")
704                # wait for status stable
705                time.sleep(10)
706                cell_state = self.get_cell_status(second_cell_type, second_cell_number)
707                self.log.info(f'cell state: {cell_state}')
708                if cell_state == 'CONN\n':
709                    return
710                else:
711                    self.turn_cell_off(second_cell_type, second_cell_number)
712                    # wait for LTE cell to connect again
713                    self.wait_until_attached_one_cell(first_cell_type,
714                                            first_cell_number, dut, 120,
715                                            2)
716
717            raise RuntimeError(f'Fail to aggregate to NR from LTE.')
718
719    def modify_dl_ul_mac_padding(self):
720        """Disables dl/ul mac padding packets."""
721        self.log.info('modifying dl ul mac padding')
722        self._socket_send_SCPI_command(self._SCPI_CHANGE_DL_TDOMAIN)
723        self._socket_send_SCPI_command(self._SCPI_CHANGE_UL_TDOMAIN)
724
725    def set_lte_rrc_state_change_timer(self, enabled, time=10):
726        """Configures the LTE RRC state change timer.
727
728        Args:
729            enabled: a boolean indicating if the timer should be on or off.
730            time: time in seconds for the timer to expire.
731        """
732        raise NotImplementedError(
733            'This UXM callbox simulator does not support this feature.')
734
735    def set_band(self, bts_index, band):
736        """Sets the band for the indicated base station.
737
738        Args:
739            bts_index: the base station number.
740            band: the new band.
741        """
742        raise NotImplementedError(
743            'This UXM callbox simulator does not support this feature.')
744
745    def get_duplex_mode(self, band):
746        """Determines if the band uses FDD or TDD duplex mode
747
748        Args:
749            band: a band number.
750
751        Returns:
752            an variable of class DuplexMode indicating if band is FDD or TDD.
753        """
754        raise NotImplementedError(
755            'This UXM callbox simulator does not support this feature.')
756
757    def set_input_power(self, bts_index, input_power):
758        """Sets the input power for the indicated base station.
759
760        Args:
761            bts_index: the base station number.
762            input_power: the new input power.
763        """
764        raise NotImplementedError(
765            'This UXM callbox simulator does not support this feature.')
766
767    def set_output_power(self, bts_index, output_power):
768        """Sets the output power for the indicated base station.
769
770        Args:
771            bts_index: the base station number.
772            output_power: the new output power.
773        """
774        raise NotImplementedError(
775            'This UXM callbox simulator does not support this feature.')
776
777    def set_tdd_config(self, bts_index, tdd_config):
778        """Sets the tdd configuration number for the indicated base station.
779
780        Args:
781            bts_index: the base station number.
782            tdd_config: the new tdd configuration number.
783        """
784        raise NotImplementedError(
785            'This UXM callbox simulator does not support this feature.')
786
787    def set_ssf_config(self, bts_index, ssf_config):
788        """Sets the Special Sub-Frame config number for the indicated.
789
790        base station.
791
792        Args:
793            bts_index: the base station number.
794            ssf_config: the new ssf config number.
795        """
796        raise NotImplementedError(
797            'This UXM callbox simulator does not support this feature.')
798
799    def set_bandwidth(self, bts_index, bandwidth):
800        """Sets the bandwidth for the indicated base station.
801
802        Args:
803            bts_index: the base station number
804            bandwidth: the new bandwidth
805        """
806        raise NotImplementedError(
807            'This UXM callbox simulator does not support this feature.')
808
809    def set_downlink_channel_number(self, bts_index, channel_number):
810        """Sets the downlink channel number for the indicated base station.
811
812        Args:
813            bts_index: the base station number.
814            channel_number: the new channel number.
815        """
816        raise NotImplementedError(
817            'This UXM callbox simulator does not support this feature.')
818
819    def set_mimo_mode(self, bts_index, mimo_mode):
820        """Sets the mimo mode for the indicated base station.
821
822        Args:
823            bts_index: the base station number
824            mimo_mode: the new mimo mode
825        """
826        raise NotImplementedError(
827            'This UXM callbox simulator does not support this feature.')
828
829    def set_transmission_mode(self, bts_index, tmode):
830        """Sets the transmission mode for the indicated base station.
831
832        Args:
833            bts_index: the base station number.
834            tmode: the new transmission mode.
835        """
836        raise NotImplementedError(
837            'This UXM callbox simulator does not support this feature.')
838
839    def set_scheduling_mode(self,
840                            bts_index,
841                            scheduling,
842                            mcs_dl=None,
843                            mcs_ul=None,
844                            nrb_dl=None,
845                            nrb_ul=None):
846        """Sets the scheduling mode for the indicated base station.
847
848        Args:
849            bts_index: the base station number.
850            scheduling: the new scheduling mode.
851            mcs_dl: Downlink MCS.
852            mcs_ul: Uplink MCS.
853            nrb_dl: Number of RBs for downlink.
854            nrb_ul: Number of RBs for uplink.
855        """
856        raise NotImplementedError(
857            'This UXM callbox simulator does not support this feature.')
858
859    def set_dl_256_qam_enabled(self, bts_index, enabled):
860        """Determines what MCS table should be used for the downlink.
861
862        This only saves the setting that will be used when configuring MCS.
863
864        Args:
865            bts_index: the base station number.
866            enabled: whether 256 QAM should be used.
867        """
868        raise NotImplementedError(
869            'This UXM callbox simulator does not support this feature.')
870
871    def set_ul_64_qam_enabled(self, bts_index, enabled):
872        """Determines what MCS table should be used for the uplink.
873
874        This only saves the setting that will be used when configuring MCS.
875
876        Args:
877            bts_index: the base station number.
878            enabled: whether 64 QAM should be used.
879        """
880        raise NotImplementedError(
881            'This UXM callbox simulator does not support this feature.')
882
883    def set_mac_padding(self, bts_index, mac_padding):
884        """Enables or disables MAC padding in the indicated base station.
885
886        Args:
887            bts_index: the base station number.
888            mac_padding: the new MAC padding setting.
889        """
890        raise NotImplementedError(
891            'This UXM callbox simulator does not support this feature.')
892
893    def set_cfi(self, bts_index, cfi):
894        """Sets the Channel Format Indicator for the indicated base station.
895
896        Args:
897            bts_index: the base station number.
898            cfi: the new CFI setting.
899        """
900        raise NotImplementedError(
901            'This UXM callbox simulator does not support this feature.')
902
903    def set_paging_cycle(self, bts_index, cycle_duration):
904        """Sets the paging cycle duration for the indicated base station.
905
906        Args:
907            bts_index: the base station number.
908            cycle_duration: the new paging cycle duration in milliseconds.
909        """
910        raise NotImplementedError(
911            'This UXM callbox simulator does not support this feature.')
912
913    def set_phich_resource(self, bts_index, phich):
914        """Sets the PHICH Resource setting for the indicated base station.
915
916        Args:
917            bts_index: the base station number.
918            phich: the new PHICH resource setting.
919        """
920        raise NotImplementedError(
921            'This UXM callbox simulator does not support this feature.')
922
923    def lte_attach_secondary_carriers(self, ue_capability_enquiry):
924        """Activates the secondary carriers for CA.
925
926        Requires the DUT to be attached to the primary carrier first.
927
928        Args:
929            ue_capability_enquiry: UE capability enquiry message to be sent to
930              the UE before starting carrier aggregation.
931        """
932        raise NotImplementedError(
933            'This UXM callbox simulator does not support this feature.')
934
935    def wait_until_communication_state(self, timeout=120):
936        """Waits until the DUT is in Communication state.
937
938        Args:
939            timeout: after this amount of time the method will raise
940                a CellularSimulatorError exception. Default is 120 seconds.
941        """
942        raise NotImplementedError(
943            'This UXM callbox simulator does not support this feature.')
944
945    def wait_until_idle_state(self, timeout=120):
946        """Waits until the DUT is in Idle state.
947
948        Args:
949            timeout: after this amount of time the method will raise a
950                CellularSimulatorError exception. Default is 120 seconds.
951        """
952        # turn on RRC release
953        cell_type = self.cells[0][self._KEY_CELL_TYPE]
954        cell_number = self.cells[0][self._KEY_CELL_NUMBER]
955
956        # choose cmd base on cell type
957        cmd = None
958        if cell_type == 'LTE':
959            cmd = self.SCPI_RRC_RELEASE_LTE_CMD
960        else:
961            cmd = self.SCPI_RRC_RELEASE_NR_CMD
962
963        if not cmd:
964            raise RuntimeError(f'Cell type [{cell_type}] is not supporting IDLE.')
965
966        # checking status
967        self.log.info('Wait for IDLE state.')
968        for _ in range(5):
969            cell_state = self.get_cell_status(cell_type, cell_number)
970            self.log.info(f'cell state: {cell_state}')
971            if cell_state == 'CONN\n':
972                # RRC release
973                self._socket_send_SCPI_command(cmd.format(cell_type, cell_number))
974                # wait for status stable
975                time.sleep(60)
976            elif cell_state == 'IDLE\n':
977                return
978
979        raise RuntimeError('RRC release fail.')
980
981    def detach(self):
982        """ Turns off all the base stations so the DUT loose connection."""
983        for cell in self.cells:
984            cell_type = cell[self._KEY_CELL_TYPE]
985            cell_number = cell[self._KEY_CELL_NUMBER]
986            self.turn_cell_off(cell_type, cell_number)
987            time.sleep(5)
988
989    def stop(self):
990        """Stops current simulation.
991
992        After calling this method, the simulator will need to be set up again.
993        """
994        raise NotImplementedError(
995            'This UXM callbox simulator does not support this feature.')
996
997    def start_data_traffic(self):
998        """Starts transmitting data from the instrument to the DUT. """
999        raise NotImplementedError(
1000            'This UXM callbox simulator does not support this feature.')
1001
1002    def stop_data_traffic(self):
1003        """Stops transmitting data from the instrument to the DUT. """
1004        raise NotImplementedError(
1005            'This UXM callbox simulator does not support this feature.')
1006