• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#   Copyright 2022 - The Android Open Source Project
2#
3#   Licensed under the Apache License, Version 2.0 (the 'License');
4#   you may not use this file except in compliance with the License.
5#   You may obtain a copy of the License at
6#
7#       http://www.apache.org/licenses/LICENSE-2.0
8#
9#   Unless required by applicable law or agreed to in writing, software
10#   distributed under the License is distributed on an 'AS IS' BASIS,
11#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12#   See the License for the specific language governing permissions and
13#   limitations under the License.
14import re
15import logging
16import os
17import paramiko
18import socket
19import time
20
21from acts.controllers.cellular_simulator import AbstractCellularSimulator
22
23class SocketWrapper():
24    """A wrapper for socket communicate with test equipment.
25
26    Attributes:
27        _socket: a socket object.
28        _ip: a string value for ip address
29            which we want to connect.
30        _port: an integer for port
31            which we want to connect.
32        _connecting_timeout: an integer for socket connecting timeout.
33        _encode_format: a string specify encoding format.
34        _cmd_terminator: a character indicates the end of command/data
35            which need to be sent.
36    """
37
38    def __init__(self, ip, port,
39                 connecting_timeout=120,
40                 cmd_terminator='\n',
41                 encode_format='utf-8',
42                 buff_size=1024):
43        self._socket = None
44        self._ip = ip
45        self._port = port
46        self._connecting_timeout = connecting_timeout
47        self._cmd_terminator = cmd_terminator
48        self._encode_format = encode_format
49        self._buff_size = buff_size
50        self._logger = logging.getLogger(__name__)
51
52    def _connect(self):
53        self._socket = socket.create_connection(
54            (self._ip, self._port), timeout=self._connecting_timeout
55        )
56
57    def send_command(self, cmd: str):
58        if not self._socket:
59            self._connect()
60        if cmd and cmd[-1] != self._cmd_terminator:
61            cmd = cmd + self._cmd_terminator
62        self._socket.sendall(cmd.encode(self._encode_format))
63
64    def send_command_recv(self, cmd: str) -> str:
65        """Send data and wait for response
66
67        Args:
68            cmd: a string command to be sent.
69
70        Returns:
71            a string response.
72        """
73        self.send_command(cmd)
74        response = ''
75        try:
76            response = self._socket.recv(self._buff_size).decode(
77                self._encode_format
78            )
79        except socket.timeout as e:
80            self._logger.info('Socket timeout while receiving response.')
81            self.close()
82            raise
83
84        return response
85
86    def close(self):
87        self._socket.close()
88        self._socket = None
89
90class UXMCellularSimulator(AbstractCellularSimulator):
91    """A cellular simulator for UXM callbox."""
92
93    # Keys to obtain data from cell_info dictionary.
94    KEY_CELL_NUMBER = "cell_number"
95    KEY_CELL_TYPE = "cell_type"
96
97    # UXM socket port
98    UXM_SOCKET_PORT = 5125
99
100    # UXM SCPI COMMAND
101    SCPI_IMPORT_STATUS_QUERY_CMD = 'SYSTem:SCPI:IMPort:STATus?'
102    SCPI_SYSTEM_ERROR_CHECK_CMD = 'SYST:ERR?\n'
103    SCPI_CHECK_CONNECTION_CMD = '*IDN?\n'
104    SCPI_DEREGISTER_UE_IMS = 'SYSTem:IMS:SERVer:UE:DERegister'
105    # require: path to SCPI file
106    SCPI_IMPORT_SCPI_FILE_CMD = 'SYSTem:SCPI:IMPort "{}"\n'
107    # require: 1. cell type (E.g. NR5G), 2. cell number (E.g CELL1)
108    SCPI_CELL_ON_CMD = 'BSE:CONFig:{}:{}:ACTive 1'
109    SCPI_CELL_OFF_CMD = 'BSE:CONFig:{}:{}:ACTive 0'
110    SCPI_GET_CELL_STATUS = 'BSE:STATus:{}:{}?'
111    SCPI_RRC_RELEASE_LTE_CMD = 'BSE:FUNCtion:{}:{}:RELease:SEND'
112    SCPI_RRC_RELEASE_NR_CMD = 'BSE:CONFig:{}:{}:RCONtrol:RRC:STARt RRELease'
113    # require cell number
114    SCPI_CREATE_DEDICATED_BEARER = 'BSE:FUNCtion:LTE:{}:NAS:EBID10:DEDicated:CREate'
115    SCPI_CHANGE_SIM_NR_CMD = 'BSE:CONFig:NR5G:CELL1:SECurity:AUTHenticate:KEY:TYPE {}'
116    SCPI_CHANGE_SIM_LTE_CMD = 'BSE:CONFig:LTE:SECurity:AUTHenticate:KEY {}'
117    SCPI_SETTINGS_PRESET_CMD = 'SYSTem:PRESet:FULL'
118
119    # UXM's Test Application recovery
120    TA_BOOT_TIME = 100
121
122    # shh command
123    SSH_START_GUI_APP_CMD_FORMAT = 'psexec -s -d -i 1 "{exe_path}"'
124    SSH_CHECK_APP_RUNNING_CMD_FORMAT = 'tasklist | findstr /R {regex_app_name}'
125    SSH_KILL_PROCESS_BY_NAME = 'taskkill /IM {process_name} /F'
126    UXM_TEST_APP_NAME = 'TestApp.exe'
127
128    # start process success regex
129    PSEXEC_PROC_STARTED_REGEX_FORMAT = 'started on * with process ID {proc_id}'
130
131    # HCCU default value
132    HCCU_SOCKET_PORT = 4882
133    # number of digit of the length of setup name
134    HCCU_SCPI_CHANGE_SETUP_CMD = ':SYSTem:SETup:CONFig #{number_of_digit}{setup_name_len}{setup_name}'
135    HCCU_SCPI_CHANGE_SCENARIO_CMD = ':SETup:SCENe "((NE_1, {scenario_name}))"'
136    HCCU_STATUS_CHECK_CMD = ':SETup:INSTrument:STATus? 0\n'
137    HCCU_FR2_SETUP_NAME = '{Name:"TSPC_1UXM5G_HF_2RRH_M1740A"}'
138    HCCU_FR1_SETUP_NAME = '{Name:"TSPC_1UXM5G_LF"}'
139    HCCU_GET_INSTRUMENT_COUNT_CMD = ':SETup:INSTrument:COUNt?'
140    HCCU_FR2_INSTRUMENT_COUNT = 5
141    HCCU_FR1_INSTRUMENT_COUNT = 2
142    HCCU_FR2_SCENARIO = 'NR_4DL2x2_2UL2x2_LTE_4CC'
143    HCCU_FR1_SCENARIO = 'NR_1DL4x4_1UL2x2_LTE_4CC'
144
145
146    def __init__(self, ip_address, custom_files,uxm_user,
147                 ssh_private_key_to_uxm, ta_exe_path, ta_exe_name):
148        """Initializes the cellular simulator.
149
150        Args:
151            ip_address: the ip address of host where Keysight Test Application (TA)
152                is installed.
153            custom_files: a list of file path for custom files.
154            uxm_user: username of host where Keysight TA resides.
155            ssh_private_key_to_uxm: private key for key based ssh to
156                host where Keysight TA resides.
157            ta_exe_path: path to TA exe.
158            ta_exe_name: name of TA exe.
159        """
160        super().__init__()
161        self.custom_files = custom_files
162        self.rockbottom_script = None
163        self.cells = []
164        self.uxm_ip = ip_address
165        self.uxm_user = uxm_user
166        self.ssh_private_key_to_uxm = os.path.expanduser(
167                                        ssh_private_key_to_uxm)
168        self.ta_exe_path = ta_exe_path
169        self.ta_exe_name = ta_exe_name
170        self.ssh_client = self.create_ssh_client()
171
172        # get roclbottom file
173        for file in self.custom_files:
174            if 'rockbottom_' in file:
175                self.rockbottom_script = file
176
177        # connect to Keysight Test Application via socket
178        self.recovery_ta()
179        self.socket = self._socket_connect(self.uxm_ip, self.UXM_SOCKET_PORT)
180        self.check_socket_connection()
181        self.timeout = 120
182
183        # hccu socket
184        self.hccu_socket_port = self.HCCU_SOCKET_PORT
185        self.hccu_socket = SocketWrapper(self.uxm_ip, self.hccu_socket_port)
186
187    def socket_connect(self):
188        self.socket = self._socket_connect(self.uxm_ip, self.UXM_SOCKET_PORT)
189
190    def switch_HCCU_scenario(self, scenario_name: str):
191        cmd = self.HCCU_SCPI_CHANGE_SCENARIO_CMD.format(
192            scenario_name=scenario_name)
193        self.hccu_socket.send_command(cmd)
194        self.log.debug(f'Sent command: {cmd}')
195        # this is require for the command to take effect
196        # because hccu's port need to be free.
197        self.hccu_socket.close()
198
199    def switch_HCCU_setup(self, setup_name: str):
200        """Change HHCU system setup.
201
202        Args:
203            setup_name: a string name
204                of the system setup will be changed to.
205        """
206        setup_name_len = str(len(setup_name))
207        number_of_digit = str(len(setup_name_len))
208        cmd = self.HCCU_SCPI_CHANGE_SETUP_CMD.format(
209            number_of_digit=number_of_digit,
210            setup_name_len=setup_name_len,
211            setup_name=setup_name
212        )
213        self.hccu_socket.send_command(cmd)
214        self.log.debug(f'Sent command: {cmd}')
215        # this is require for the command to take effect
216        # because hccu's port need to be free.
217        self.hccu_socket.close()
218
219    def wait_until_hccu_operational(self, timeout=1200):
220        """ Wait for hccu is ready to operate for a specified timeout.
221
222        Args:
223            timeout: time we are waiting for
224                hccu in opertional status.
225
226        Returns:
227            True if HCCU status is operational within timeout.
228            False otherwise.
229        """
230        # check status
231        self.log.info('Waiting for HCCU to ready to operate.')
232        cmd = self.HCCU_STATUS_CHECK_CMD
233        t = 0
234        interval = 10
235        while t < timeout:
236            response = self.hccu_socket.send_command_recv(cmd)
237            if response == 'OPER\n':
238                return True
239            time.sleep(interval)
240            t += interval
241        return False
242
243    def switch_HCCU_settings(self, is_fr2: bool):
244        """Set HCCU setup configuration.
245
246        HCCU stands for Hardware Configuration Control Utility,
247        an interface allows us to control Keysight Test Equipment.
248
249        Args:
250            is_fr2: a bool value.
251        """
252        # change HCCU configration
253        data = ''
254        scenario_name = ''
255        instrument_count_res = self.hccu_socket.send_command_recv(
256            self.HCCU_GET_INSTRUMENT_COUNT_CMD)
257        instrument_count = int(instrument_count_res)
258        # if hccu setup is correct, no need to change.
259        if is_fr2 and instrument_count == self.HCCU_FR2_INSTRUMENT_COUNT:
260            self.log.info('UXM has correct HCCU setup.')
261            return
262        if not is_fr2 and instrument_count == self.HCCU_FR1_INSTRUMENT_COUNT:
263            self.log.info('UXM has correct HCCU setup.')
264            return
265
266        self.log.info('UXM has incorrect HCCU setup, start changing setup.')
267        # terminate TA and close socket
268        self.log.info('Terminate TA before switch HCCU settings.')
269        self.terminate_process(self.UXM_TEST_APP_NAME)
270        self.socket.close()
271
272        # change hccu setup
273        if is_fr2:
274            data = self.HCCU_FR2_SETUP_NAME
275            scenario_name = self.HCCU_FR2_SCENARIO
276        else:
277            data = self.HCCU_FR1_SETUP_NAME
278            scenario_name = self.HCCU_FR1_SCENARIO
279        self.log.info('Switch HCCU setup.')
280        self.switch_HCCU_setup(data)
281        time.sleep(10)
282        if not self.wait_until_hccu_operational():
283            raise RuntimeError('Fail to switch HCCU setup.')
284
285        # change scenario
286        self.log.info('Ativate HCCU scenario.')
287        self.switch_HCCU_scenario(scenario_name)
288        time.sleep(40)
289        if not self.wait_until_hccu_operational():
290            raise RuntimeError('Fail to switch HCCU scenario.')
291
292        # start TA and reconnect socket.
293        self.recovery_ta()
294        self.socket = self._socket_connect(self.uxm_ip, self.UXM_SOCKET_PORT)
295
296    def create_ssh_client(self):
297        """Create a ssh client to host."""
298        ssh = paramiko.SSHClient()
299        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
300        mykey = paramiko.Ed25519Key.from_private_key_file(
301            self.ssh_private_key_to_uxm)
302        ssh.connect(hostname=self.uxm_ip, username=self.uxm_user, pkey=mykey)
303        self.log.info('SSH client to %s is connected' % self.uxm_ip)
304        return ssh
305
306    def terminate_process(self, process_name):
307        cmd = self.SSH_KILL_PROCESS_BY_NAME.format(
308            process_name=process_name
309        )
310        stdin, stdout, stderr = self.ssh_client.exec_command(cmd)
311        stdin.close()
312        err = ''.join(stderr.readlines())
313        out = ''.join(stdout.readlines())
314        final_output = str(out) + str(err)
315        self.log.info(final_output)
316        return out
317
318    def is_ta_running(self):
319        is_running_cmd = self.SSH_CHECK_APP_RUNNING_CMD_FORMAT.format(
320            regex_app_name=self.ta_exe_name)
321        stdin, stdout, stderr = self.ssh_client.exec_command(is_running_cmd)
322        stdin.close()
323        err = ''.join(stderr.readlines())
324        out = ''.join(stdout.readlines())
325        final_output = str(out) + str(err)
326        self.log.info(final_output)
327        return (out != '' and err == '')
328
329    def _start_test_app(self):
330        """Start Test Application on Windows."""
331        # start GUI exe via ssh
332        start_app_cmd = self.SSH_START_GUI_APP_CMD_FORMAT.format(
333            exe_path=self.ta_exe_path)
334        stdin, stdout, stderr = self.ssh_client.exec_command(start_app_cmd)
335        self.log.info(f'Command sent to {self.uxm_ip}: {start_app_cmd}')
336        stdin.close()
337        err = ''.join(stderr.readlines())
338        out = ''.join(stdout.readlines())
339        # psexec return process ID as part of the exit code
340        exit_status = stderr.channel.recv_exit_status()
341        is_started = re.search(
342            self.PSEXEC_PROC_STARTED_REGEX_FORMAT.format(proc_id=exit_status),
343            err[-1])
344        if is_started:
345            raise RuntimeError('Fail to start TA: ' + out + err)
346        # wait for ta completely boot up
347        self.log.info('TA is starting')
348        time.sleep(self.TA_BOOT_TIME)
349
350    def recovery_ta(self):
351        """Start TA if it is not running."""
352        if not self.is_ta_running():
353            self._start_test_app()
354            # checking if ta booting process complete
355            # by checking socket connection
356            s = None
357            retries = 12
358            for _ in range(retries):
359                try:
360                    s = self._socket_connect(self.uxm_ip, self.UXM_SOCKET_PORT)
361                    s.close()
362                    return
363                except ConnectionRefusedError as cre:
364                    self.log.info(
365                        'Connection refused, wait 10s for TA to boot')
366                    time.sleep(10)
367            raise RuntimeError('TA does not start on time')
368
369    def set_rockbottom_script_path(self, path):
370        """Set path to rockbottom script.
371
372        Args:
373            path: path to rockbottom script.
374        """
375        self.rockbottom_script = path
376
377    def set_cell_info(self, cell_info):
378        """Set type and number for multiple cells.
379
380        Args:
381            cell_info: list of dictionaries,
382                each dictionary contain cell type
383                and cell number for each cell
384                that the simulator need to control.
385        """
386        if not cell_info:
387            raise ValueError('Missing cell info from configurations file')
388        self.cells = cell_info
389
390    def deregister_ue_ims(self):
391        """Remove UE IMS profile from UXM."""
392        self._socket_send_SCPI_command(
393                self.SCPI_DEREGISTER_UE_IMS)
394
395    def create_dedicated_bearer(self):
396        """Create a dedicated bearer setup for ims call.
397
398        After UE connected and register on UXM IMS tab.
399        It is required to create a dedicated bearer setup
400        with EPS bearer ID 10.
401        """
402        cell_number = self.cells[0][self.KEY_CELL_NUMBER]
403        self._socket_send_SCPI_command(
404                self.SCPI_CREATE_DEDICATED_BEARER.format(cell_number))
405
406    def turn_cell_on(self, cell_type, cell_number):
407        """Turn UXM's cell on.
408
409        Args:
410            cell_type: type of cell (e.g NR5G, LTE).
411            cell_number: ordinal number of a cell.
412        """
413        if cell_type and cell_number:
414            self._socket_send_SCPI_command(
415                self.SCPI_CELL_ON_CMD.format(cell_type, cell_number))
416        else:
417            raise ValueError('Invalid cell info\n' +
418                             f' cell type: {cell_type}\n' +
419                             f' cell number: {cell_number}\n')
420
421    def turn_cell_off(self, cell_type, cell_number):
422        """Turn UXM's cell off.
423
424        Args:
425            cell_type: type of cell (e.g NR5G, LTE).
426            cell_number: ordinal number of a cell.
427        """
428        if cell_type and cell_number:
429            self._socket_send_SCPI_command(
430                self.SCPI_CELL_OFF_CMD.format(cell_type, cell_number))
431        else:
432            raise ValueError('Invalid cell info\n' +
433                             f' cell type: {cell_type}\n' +
434                             f' cell number: {cell_number}\n')
435
436    def get_cell_status(self, cell_type, cell_number):
437        """Get status of cell.
438
439        Args:
440            cell_type: type of cell (e.g NR5G, LTE).
441            cell_number: ordinal number of a cell.
442        """
443        if not cell_type or not cell_number:
444            raise ValueError('Invalid cell with\n' +
445                             f' cell type: {cell_type}\n' +
446                             f' cell number: {cell_number}\n')
447
448        return self._socket_send_SCPI_for_result_command(
449            self.SCPI_GET_CELL_STATUS.format(cell_type, cell_number))
450
451    def check_socket_connection(self):
452        """Check if the socket connection is established.
453
454        Query the identification of the Keysight Test Application
455        we are trying to connect to. Empty response indicates
456        connection fail, and vice versa.
457        """
458        self.socket.sendall(self.SCPI_CHECK_CONNECTION_CMD.encode())
459        response = self.socket.recv(1024).decode()
460        if response:
461            self.log.info(f'Connected to: {response}')
462        else:
463            self.log.error('Fail to connect to callbox')
464
465    def _socket_connect(self, host, port):
466        """Create socket connection.
467
468        Args:
469            host: IP address of desktop where Keysight Test Application resides.
470            port: port that Keysight Test Application is listening for socket
471                communication.
472        Returns:
473            s: socket object.
474        """
475        self.log.info('Establishing connection to callbox via socket')
476        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
477        s.connect((host, port))
478        return s
479
480    def _socket_send_SCPI_command(self, command):
481        """Send SCPI command without expecting response.
482
483        Args:
484            command: a string SCPI command.
485        """
486        # make sure there is a line break for the socket to send command
487        command = command + '\n'
488        # send command
489        self.socket.sendall(command.encode())
490        self.log.info(f'Sent {command}')
491
492    def _socket_receive_SCPI_result(self):
493        """Receive response from socket. """
494        i = 1
495        response = ''
496        while i < self.timeout and not response:
497            response = self.socket.recv(1024).decode()
498            i += 1
499        return response
500
501    def _socket_send_SCPI_for_result_command(self, command):
502        """Send SCPI command and expecting response.
503
504        Args:
505            command: a string SCPI command.
506        """
507        self._socket_send_SCPI_command(command)
508        response = self._socket_receive_SCPI_result()
509        return response
510
511    def check_system_error(self):
512        """Query system error from Keysight Test Application.
513
514        Returns:
515            status: a message indicate the number of errors
516                and detail of errors if any.
517                a string `0,"No error"` indicates no error.
518        """
519        status = self._socket_send_SCPI_for_result_command(
520            self.SCPI_SYSTEM_ERROR_CHECK_CMD)
521        self.log.info(f'System error status: {status}')
522        return status
523
524    def import_configuration(self, path):
525        """Import SCPI config file.
526
527        Args:
528            path: path to SCPI file.
529        """
530        self._socket_send_SCPI_command(
531            self.SCPI_SETTINGS_PRESET_CMD)
532        time.sleep(10)
533        self._socket_send_SCPI_command(
534            self.SCPI_IMPORT_SCPI_FILE_CMD.format(path))
535        time.sleep(45)
536
537    def destroy(self):
538        """Close socket connection with UXM. """
539        self.socket.close()
540
541    def setup_lte_scenario(self, path):
542        """Configures the equipment for an LTE simulation.
543
544        Args:
545            path: path to SCPI config file.
546        """
547        self.import_configuration(path)
548
549    def dut_rockbottom(self, dut):
550        """Set the dut to rockbottom state.
551
552        Args:
553            dut: a CellularAndroid controller.
554        """
555        # The rockbottom script might include a device reboot, so it is
556        # necessary to stop SL4A during its execution.
557        dut.ad.stop_services()
558        self.log.info('Executing rockbottom script for ' + dut.ad.model)
559        os.chmod(self.rockbottom_script, 0o777)
560        os.system('{} {}'.format(self.rockbottom_script, dut.ad.serial))
561        # Make sure the DUT is in root mode after coming back
562        dut.ad.root_adb()
563        # Restart SL4A
564        dut.ad.start_services()
565
566    def set_sim_type(self, is_3gpp_sim):
567        sim_type = 'KEYSight'
568        if is_3gpp_sim:
569            sim_type = 'TEST3GPP'
570        self._socket_send_SCPI_command(
571            self.SCPI_CHANGE_SIM_NR_CMD.format(sim_type))
572        time.sleep(2)
573        self._socket_send_SCPI_command(
574            self.SCPI_CHANGE_SIM_LTE_CMD.format(sim_type))
575        time.sleep(2)
576
577    def wait_until_attached_one_cell(self,
578                                     cell_type,
579                                     cell_number,
580                                     dut,
581                                     wait_for_camp_interval,
582                                     attach_retries):
583        """Wait until connect to given UXM cell.
584
585        After turn off airplane mode, sleep for
586        wait_for_camp_interval seconds for device to camp.
587        If not device is not connected after the wait,
588        either toggle airplane mode on/off or reboot device.
589        Args:
590            cell_type: type of cell
591                which we are trying to connect to.
592            cell_number: ordinal number of a cell
593                which we are trying to connect to.
594            dut: a AndroidCellular controller.
595            wait_for_camp_interval: sleep interval,
596                wait for device to camp.
597            attach_retries: number of retry
598                to wait for device
599                to connect to 1 basestation.
600        Raise:
601            RuntimeError: device unable to connect to cell.
602        """
603        # airplane mode on
604        dut.toggle_airplane_mode(True)
605        time.sleep(5)
606
607        # turn cell on
608        self.turn_cell_on(cell_type, cell_number)
609        time.sleep(5)
610
611        interval = 10
612        # waits for device to camp
613        for index in range(1, attach_retries):
614            count = 0
615            # airplane mode off
616            dut.toggle_airplane_mode(False)
617            time.sleep(5)
618            # check connection in small interval
619            while count < wait_for_camp_interval:
620                time.sleep(interval)
621                cell_state = self.get_cell_status(cell_type, cell_number)
622                self.log.info(f'cell state: {cell_state}')
623                if cell_state == 'CONN\n':
624                    # wait for connection stable
625                    time.sleep(15)
626                    # check connection status again
627                    cell_state = self.get_cell_status(cell_type, cell_number)
628                    self.log.info(f'cell state: {cell_state}')
629                    if cell_state == 'CONN\n':
630                        return True
631                if cell_state == 'OFF\n':
632                    self.turn_cell_on(cell_type, cell_number)
633                    time.sleep(5)
634                count += interval
635
636            # reboot device
637            if (index % 2) == 0:
638                dut.ad.reboot()
639                if self.rockbottom_script:
640                    self.dut_rockbottom(dut)
641                else:
642                    self.log.warning(
643                        f'Rockbottom script was not executed after reboot.'
644                    )
645            # toggle APM and cell on/off
646            elif (index % 1) == 0:
647                # Toggle APM on
648                dut.toggle_airplane_mode(True)
649                time.sleep(5)
650
651                # Toggle simulator cell
652                self.turn_cell_off(cell_type, cell_number)
653                time.sleep(5)
654                self.turn_cell_on(cell_type, cell_number)
655                time.sleep(5)
656
657                # Toggle APM off
658                dut.toggle_airplane_mode(False)
659                time.sleep(5)
660            # increase length of small waiting interval
661            interval += 5
662
663        # Phone cannot connected to basestation of callbox
664        raise RuntimeError(
665            f'Phone was unable to connect to cell: {cell_type}-{cell_number}')
666
667    def wait_until_attached(self, dut, timeout, attach_retries):
668        """Waits until the DUT is attached to all required cells.
669
670        Args:
671            dut: a CellularAndroid controller.
672            timeout: sleep interval,
673                wait for device to camp in 1 try.
674            attach_retries: number of retry
675                to wait for device
676                to connect to 1 basestation.
677        """
678        # get cell info
679        first_cell_type = self.cells[0][self.KEY_CELL_TYPE]
680        first_cell_number = self.cells[0][self.KEY_CELL_NUMBER]
681        if len(self.cells) == 2:
682            second_cell_type = self.cells[1][self.KEY_CELL_TYPE]
683            second_cell_number = self.cells[1][self.KEY_CELL_NUMBER]
684
685        # connect to 1st cell
686        self.wait_until_attached_one_cell(first_cell_type,
687                                          first_cell_number, dut, timeout,
688                                          attach_retries)
689
690        # aggregation to NR
691        if len(self.cells) == 2:
692            self.turn_cell_on(
693                second_cell_type,
694                second_cell_number,
695            )
696
697            for _ in range(1, attach_retries):
698                self.log.info('Try to aggregate to NR.')
699                self._socket_send_SCPI_command(
700                    'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:DL None')
701                self._socket_send_SCPI_command(
702                    'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:UL None')
703                self._socket_send_SCPI_command(
704                    'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:DL CELL1')
705                self._socket_send_SCPI_command(
706                    'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:DL CELL1')
707                time.sleep(1)
708                self._socket_send_SCPI_command(
709                    "BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:APPly")
710                # wait for status stable
711                time.sleep(10)
712                cell_state = self.get_cell_status(second_cell_type, second_cell_number)
713                self.log.info(f'cell state: {cell_state}')
714                if cell_state == 'CONN\n':
715                    return
716                else:
717                    self.turn_cell_off(second_cell_type, second_cell_number)
718                    # wait for LTE cell to connect again
719                    self.wait_until_attached_one_cell(first_cell_type,
720                                            first_cell_number, dut, 120,
721                                            2)
722
723            raise RuntimeError(f'Fail to aggregate to NR from LTE.')
724
725    def set_lte_rrc_state_change_timer(self, enabled, time=10):
726        """Configures the LTE RRC state change timer.
727
728        Args:
729            enabled: a boolean indicating if the timer should be on or off.
730            time: time in seconds for the timer to expire.
731        """
732        raise NotImplementedError(
733            'This UXM callbox simulator does not support this feature.')
734
735    def set_band(self, bts_index, band):
736        """Sets the band for the indicated base station.
737
738        Args:
739            bts_index: the base station number.
740            band: the new band.
741        """
742        raise NotImplementedError(
743            'This UXM callbox simulator does not support this feature.')
744
745    def get_duplex_mode(self, band):
746        """Determines if the band uses FDD or TDD duplex mode
747
748        Args:
749            band: a band number.
750
751        Returns:
752            an variable of class DuplexMode indicating if band is FDD or TDD.
753        """
754        raise NotImplementedError(
755            'This UXM callbox simulator does not support this feature.')
756
757    def set_input_power(self, bts_index, input_power):
758        """Sets the input power for the indicated base station.
759
760        Args:
761            bts_index: the base station number.
762            input_power: the new input power.
763        """
764        raise NotImplementedError(
765            'This UXM callbox simulator does not support this feature.')
766
767    def set_output_power(self, bts_index, output_power):
768        """Sets the output power for the indicated base station.
769
770        Args:
771            bts_index: the base station number.
772            output_power: the new output power.
773        """
774        raise NotImplementedError(
775            'This UXM callbox simulator does not support this feature.')
776
777    def set_tdd_config(self, bts_index, tdd_config):
778        """Sets the tdd configuration number for the indicated base station.
779
780        Args:
781            bts_index: the base station number.
782            tdd_config: the new tdd configuration number.
783        """
784        raise NotImplementedError(
785            'This UXM callbox simulator does not support this feature.')
786
787    def set_ssf_config(self, bts_index, ssf_config):
788        """Sets the Special Sub-Frame config number for the indicated.
789
790        base station.
791
792        Args:
793            bts_index: the base station number.
794            ssf_config: the new ssf config number.
795        """
796        raise NotImplementedError(
797            'This UXM callbox simulator does not support this feature.')
798
799    def set_bandwidth(self, bts_index, bandwidth):
800        """Sets the bandwidth for the indicated base station.
801
802        Args:
803            bts_index: the base station number
804            bandwidth: the new bandwidth
805        """
806        raise NotImplementedError(
807            'This UXM callbox simulator does not support this feature.')
808
809    def set_downlink_channel_number(self, bts_index, channel_number):
810        """Sets the downlink channel number for the indicated base station.
811
812        Args:
813            bts_index: the base station number.
814            channel_number: the new channel number.
815        """
816        raise NotImplementedError(
817            'This UXM callbox simulator does not support this feature.')
818
819    def set_mimo_mode(self, bts_index, mimo_mode):
820        """Sets the mimo mode for the indicated base station.
821
822        Args:
823            bts_index: the base station number
824            mimo_mode: the new mimo mode
825        """
826        raise NotImplementedError(
827            'This UXM callbox simulator does not support this feature.')
828
829    def set_transmission_mode(self, bts_index, tmode):
830        """Sets the transmission mode for the indicated base station.
831
832        Args:
833            bts_index: the base station number.
834            tmode: the new transmission mode.
835        """
836        raise NotImplementedError(
837            'This UXM callbox simulator does not support this feature.')
838
839    def set_scheduling_mode(self,
840                            bts_index,
841                            scheduling,
842                            mcs_dl=None,
843                            mcs_ul=None,
844                            nrb_dl=None,
845                            nrb_ul=None):
846        """Sets the scheduling mode for the indicated base station.
847
848        Args:
849            bts_index: the base station number.
850            scheduling: the new scheduling mode.
851            mcs_dl: Downlink MCS.
852            mcs_ul: Uplink MCS.
853            nrb_dl: Number of RBs for downlink.
854            nrb_ul: Number of RBs for uplink.
855        """
856        raise NotImplementedError(
857            'This UXM callbox simulator does not support this feature.')
858
859    def set_dl_256_qam_enabled(self, bts_index, enabled):
860        """Determines what MCS table should be used for the downlink.
861
862        This only saves the setting that will be used when configuring MCS.
863
864        Args:
865            bts_index: the base station number.
866            enabled: whether 256 QAM should be used.
867        """
868        raise NotImplementedError(
869            'This UXM callbox simulator does not support this feature.')
870
871    def set_ul_64_qam_enabled(self, bts_index, enabled):
872        """Determines what MCS table should be used for the uplink.
873
874        This only saves the setting that will be used when configuring MCS.
875
876        Args:
877            bts_index: the base station number.
878            enabled: whether 64 QAM should be used.
879        """
880        raise NotImplementedError(
881            'This UXM callbox simulator does not support this feature.')
882
883    def set_mac_padding(self, bts_index, mac_padding):
884        """Enables or disables MAC padding in the indicated base station.
885
886        Args:
887            bts_index: the base station number.
888            mac_padding: the new MAC padding setting.
889        """
890        raise NotImplementedError(
891            'This UXM callbox simulator does not support this feature.')
892
893    def set_cfi(self, bts_index, cfi):
894        """Sets the Channel Format Indicator for the indicated base station.
895
896        Args:
897            bts_index: the base station number.
898            cfi: the new CFI setting.
899        """
900        raise NotImplementedError(
901            'This UXM callbox simulator does not support this feature.')
902
903    def set_paging_cycle(self, bts_index, cycle_duration):
904        """Sets the paging cycle duration for the indicated base station.
905
906        Args:
907            bts_index: the base station number.
908            cycle_duration: the new paging cycle duration in milliseconds.
909        """
910        raise NotImplementedError(
911            'This UXM callbox simulator does not support this feature.')
912
913    def set_phich_resource(self, bts_index, phich):
914        """Sets the PHICH Resource setting for the indicated base station.
915
916        Args:
917            bts_index: the base station number.
918            phich: the new PHICH resource setting.
919        """
920        raise NotImplementedError(
921            'This UXM callbox simulator does not support this feature.')
922
923    def lte_attach_secondary_carriers(self, ue_capability_enquiry):
924        """Activates the secondary carriers for CA.
925
926        Requires the DUT to be attached to the primary carrier first.
927
928        Args:
929            ue_capability_enquiry: UE capability enquiry message to be sent to
930              the UE before starting carrier aggregation.
931        """
932        raise NotImplementedError(
933            'This UXM callbox simulator does not support this feature.')
934
935    def wait_until_communication_state(self, timeout=120):
936        """Waits until the DUT is in Communication state.
937
938        Args:
939            timeout: after this amount of time the method will raise
940                a CellularSimulatorError exception. Default is 120 seconds.
941        """
942        raise NotImplementedError(
943            'This UXM callbox simulator does not support this feature.')
944
945    def wait_until_idle_state(self, timeout=120):
946        """Waits until the DUT is in Idle state.
947
948        Args:
949            timeout: after this amount of time the method will raise a
950                CellularSimulatorError exception. Default is 120 seconds.
951        """
952        # turn on RRC release
953        cell_type = self.cells[0][self.KEY_CELL_TYPE]
954        cell_number = self.cells[0][self.KEY_CELL_NUMBER]
955
956        # choose cmd base on cell type
957        cmd = None
958        if cell_type == 'LTE':
959            cmd = self.SCPI_RRC_RELEASE_LTE_CMD
960        else:
961            cmd = self.SCPI_RRC_RELEASE_NR_CMD
962
963        if not cmd:
964            raise RuntimeError(f'Cell type [{cell_type}] is not supporting IDLE.')
965
966        # checking status
967        self.log.info('Wait for IDLE state.')
968        for _ in range(5):
969            cell_state = self.get_cell_status(cell_type, cell_number)
970            self.log.info(f'cell state: {cell_state}')
971            if cell_state == 'CONN\n':
972                # RRC release
973                self._socket_send_SCPI_command(cmd.format(cell_type, cell_number))
974                # wait for status stable
975                time.sleep(60)
976            elif cell_state == 'IDLE\n':
977                return
978
979        raise RuntimeError('RRC release fail.')
980
981    def detach(self):
982        """ Turns off all the base stations so the DUT loose connection."""
983        for cell in self.cells:
984            cell_type = cell[self.KEY_CELL_TYPE]
985            cell_number = cell[self.KEY_CELL_NUMBER]
986            self.turn_cell_off(cell_type, cell_number)
987            time.sleep(5)
988
989    def stop(self):
990        """Stops current simulation.
991
992        After calling this method, the simulator will need to be set up again.
993        """
994        raise NotImplementedError(
995            'This UXM callbox simulator does not support this feature.')
996
997    def start_data_traffic(self):
998        """Starts transmitting data from the instrument to the DUT. """
999        raise NotImplementedError(
1000            'This UXM callbox simulator does not support this feature.')
1001
1002    def stop_data_traffic(self):
1003        """Stops transmitting data from the instrument to the DUT. """
1004        raise NotImplementedError(
1005            'This UXM callbox simulator does not support this feature.')
1006