• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2021 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#           http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import logging
18import time
19import sys
20
21from enum import Enum
22from os import path
23from acts.controllers import abstract_inst
24
25DEFAULT_XLAPI_PATH = '/home/mobileharness/Rohde-Schwarz/XLAPI/latest/venv/lib/python3.7/site-packages'
26DEFAULT_LTE_STATE_CHANGE_TIMER = 10
27DEFAULT_CELL_SWITCH_ON_TIMER = 60
28DEFAULT_ENDC_TIMER = 300
29
30logger = logging.getLogger('Xlapi_cmx500')
31
32LTE_CELL_PROPERTIES = [
33    'band',
34    'bandwidth',
35    'dl_earfcn',
36    'ul_earfcn',
37    'total_dl_power',
38    'p_b',
39    'dl_epre',
40    'ref_signal_power',
41    'm',
42    'beamforming_antenna_ports',
43    'p0_nominal_pusch',
44]
45
46LTE_MHZ_UPPER_BOUND_TO_RB = [
47    (1.5, 6),
48    (4.0, 15),
49    (7.5, 25),
50    (12.5, 50),
51    (17.5, 75),
52]
53
54
55class DciFormat(Enum):
56    """Support DCI Formats for MIMOs."""
57    DCI_FORMAT_0 = 1
58    DCI_FORMAT_1 = 2
59    DCI_FORMAT_1A = 3
60    DCI_FORMAT_1B = 4
61    DCI_FORMAT_1C = 5
62    DCI_FORMAT_2 = 6
63    DCI_FORMAT_2A = 7
64    DCI_FORMAT_2B = 8
65    DCI_FORMAT_2C = 9
66    DCI_FORMAT_2D = 10
67
68
69class DuplexMode(Enum):
70    """Duplex Modes."""
71    FDD = 'FDD'
72    TDD = 'TDD'
73    DL_ONLY = 'DL_ONLY'
74
75
76class LteBandwidth(Enum):
77    """Supported LTE bandwidths."""
78    BANDWIDTH_1MHz = 6  # MHZ_1 is RB_6
79    BANDWIDTH_3MHz = 15  # MHZ_3 is RB_15
80    BANDWIDTH_5MHz = 25  # MHZ_5 is RB_25
81    BANDWIDTH_10MHz = 50  # MHZ_10 is RB_50
82    BANDWIDTH_15MHz = 75  # MHZ_15 is RB_75
83    BANDWIDTH_20MHz = 100  # MHZ_20 is RB_100
84
85
86class LteState(Enum):
87    """LTE ON and OFF."""
88    LTE_ON = 'ON'
89    LTE_OFF = 'OFF'
90
91
92class MimoModes(Enum):
93    """MIMO Modes dl antennas."""
94    MIMO1x1 = 1
95    MIMO2x2 = 2
96    MIMO4x4 = 4
97
98
99class ModulationType(Enum):
100    """Supported Modulation Types."""
101    Q16 = 0
102    Q64 = 1
103    Q256 = 2
104
105
106class NasState(Enum):
107    """NAS state between callbox and dut."""
108    DEREGISTERED = 'OFF'
109    EMM_REGISTERED = 'EMM'
110    MM5G_REGISTERED = 'NR'
111
112
113class RrcState(Enum):
114    """States to enable/disable rrc."""
115    RRC_ON = 'ON'
116    RRC_OFF = 'OFF'
117
118
119class RrcConnectionState(Enum):
120    """RRC Connection states, describes possible DUT RRC connection states."""
121    IDLE = 1
122    IDLE_PAGING = 2
123    IDLE_CONNECTION_ESTABLISHMENT = 3
124    CONNECTED = 4
125    CONNECTED_CONNECTION_REESTABLISHMENT = 5
126    CONNECTED_SCG_FAILURE = 6
127    CONNECTED_HANDOVER = 7
128    CONNECTED_CONNECTION_RELEASE = 8
129
130
131class SchedulingMode(Enum):
132    """Supported scheduling modes."""
133    USERDEFINEDCH = 'UDCHannels'
134
135
136class TransmissionModes(Enum):
137    """Supported transmission modes."""
138    TM1 = 1
139    TM2 = 2
140    TM3 = 3
141    TM4 = 4
142    TM7 = 7
143    TM8 = 8
144    TM9 = 9
145
146
147# For mimo 1x1, also set_num_crs_antenna_ports to 1
148MIMO_MAX_LAYER_MAPPING = {
149    MimoModes.MIMO1x1: 2,
150    MimoModes.MIMO2x2: 2,
151    MimoModes.MIMO4x4: 4,
152}
153
154
155class Cmx500(abstract_inst.SocketInstrument):
156    def __init__(self, ip_addr, port, xlapi_path=DEFAULT_XLAPI_PATH):
157        """Init method to setup variables for the controller.
158
159        Args:
160              ip_addr: Controller's ip address.
161              port: Port.
162        """
163
164        # keeps the socket connection for debug purpose for now
165        super().__init__(ip_addr, port)
166        if not xlapi_path in sys.path:
167            sys.path.insert(0, xlapi_path)
168        self._initial_xlapi()
169        self._settings.system.set_instrument_address(ip_addr)
170        logger.info('The instrument address is {}'.format(
171            self._settings.system.get_instrument_address()))
172
173        self.bts = []
174
175        # Stops all active cells if there is any
176        self.disconnect()
177
178        self.dut = self._network.get_dut()
179        self.lte_cell = self._network.create_lte_cell('ltecell0')
180        self.nr_cell = self._network.create_nr_cell('nrcell0')
181        self._config_antenna_ports()
182
183        self.lte_rrc_state_change_timer = DEFAULT_LTE_STATE_CHANGE_TIMER
184        self.rrc_state_change_time_enable = False
185        self.cell_switch_on_timer = DEFAULT_CELL_SWITCH_ON_TIMER
186
187    # _config_antenna_ports for the special RF connection with cmw500 + cmx500.
188    def _config_antenna_ports(self):
189        from rs_mrt.testenvironment.signaling.sri.rat.common import CsiRsAntennaPorts
190        from rs_mrt.testenvironment.signaling.sri.rat.lte import CrsAntennaPorts
191
192        max_csi_rs_ports = CsiRsAntennaPorts.NUMBER_CSI_RS_ANTENNA_PORTS_FOUR
193        max_crs_ports = CrsAntennaPorts.NUMBER_CRS_ANTENNA_PORTS_FOUR
194
195        lte_cell_max_config = self.lte_cell.stub.GetMaximumConfiguration()
196        lte_cell_max_config.csi_rs_antenna_ports = max_csi_rs_ports
197        lte_cell_max_config.crs_antenna_ports = max_crs_ports
198        self.lte_cell.stub.SetMaximumConfiguration(lte_cell_max_config)
199
200        nr_cell_max_config = self.nr_cell.stub.GetMaximumConfiguration()
201        nr_cell_max_config.csi_rs_antenna_ports = max_csi_rs_ports
202        self.nr_cell.stub.SetMaximumConfiguration(nr_cell_max_config)
203
204    def _initial_xlapi(self):
205        import xlapi
206        import mrtype
207        from xlapi import network
208        from xlapi import settings
209
210        self._xlapi = xlapi
211        self._network = network
212        self._settings = settings
213
214    def configure_mimo_settings(self, mimo, bts_index=0):
215        """Sets the mimo scenario for the test.
216
217        Args:
218            mimo: mimo scenario to set.
219        """
220        self.bts[bts_index].set_mimo_mode(mimo)
221
222    @property
223    def connection_type(self):
224        """Gets the connection type applied in callbox."""
225        state = self.dut.state.rrc_connection_state
226        return RrcConnectionState(state.value)
227
228    def create_base_station(self, cell):
229        """Creates the base station object with cell and current object.
230
231        Args:
232            cell: the XLAPI cell.
233
234        Returns:
235            base station object.
236        Raise:
237            CmxError if the cell is neither LTE nor NR.
238        """
239        from xlapi.lte_cell import LteCell
240        from xlapi.nr_cell import NrCell
241        if isinstance(cell, LteCell):
242            return LteBaseStation(self, cell)
243        elif isinstance(cell, NrCell):
244            return NrBaseStation(self, cell)
245        else:
246            raise CmxError('The cell type is neither LTE nor NR')
247
248    def detach(self):
249        """Detach callbox and controller."""
250        for bts in self.bts:
251            bts.stop()
252
253    def disable_packet_switching(self):
254        """Disable packet switching in call box."""
255        raise NotImplementedError()
256
257    def disconnect(self):
258        """Disconnect controller from device and switch to local mode."""
259
260        self.bts.clear()
261        self._network.reset()
262
263    def enable_packet_switching(self):
264        """Enable packet switching in call box."""
265        raise NotImplementedError()
266
267    def get_cell_configs(self, cell):
268        """Getes cell settings.
269
270        This method is for debugging purpose. In XLAPI, there are many get
271        methods in the cell or component carrier object. When this method is
272        called, the corresponding value could be recorded with logging, which is
273        useful for debug.
274
275        Args:
276            cell: the lte or nr cell in xlapi
277        """
278        cell_getters = [attr for attr in dir(cell) if attr.startswith('get')]
279        for attr in cell_getters:
280            try:
281                getter = getattr(cell, attr)
282                logger.info('The {} is {}'.format(attr, getter()))
283            except Exception as e:
284                logger.warning('Error in get {}: {}'.format(attr, e))
285
286    def get_base_station(self, bts_index=0):
287        """Gets the base station object based on bts num. By default
288        bts_index set to 0 (PCC).
289
290        Args:
291            bts_num: base station identifier
292
293        Returns:
294            base station object.
295        """
296        return self.bts[bts_index]
297
298    def get_network(self):
299        """ Gets the network object from cmx500 object."""
300        return self._network
301
302    def init_lte_measurement(self):
303        """Gets the class object for lte measurement which can be used to
304        initiate measurements.
305
306        Returns:
307            lte measurement object.
308        """
309        raise NotImplementedError()
310
311    def reset(self):
312        """System level reset."""
313
314        self.disconnect()
315
316    @property
317    def rrc_connection(self):
318        """Gets the RRC connection state."""
319        return self.dut.state.rrc.is_connected
320
321    def set_timer(self, timeout):
322        """Sets timer for the Cmx500 class."""
323        self.rrc_state_change_time_enable = True
324        self.lte_rrc_state_change_timer = timeout
325
326    def switch_lte_signalling(self, state):
327        """ Turns LTE signalling ON/OFF.
328
329        Args:
330            state: an instance of LteState indicating the state to which LTE
331                   signal has to be set.
332        """
333        if not isinstance(state, LteState):
334            raise ValueError('state should be the instance of LteState.')
335
336        if self.bts:
337            self.disconnect()
338        self.bts.append(LteBaseStation(self, self.lte_cell))
339        # Switch on the primary Lte cell for on state and switch all lte cells
340        # if the state is off state
341        if state.value == 'ON':
342            self.bts[0].start()
343            cell_status = self.bts[0].wait_cell_on(self.cell_switch_on_timer)
344            if cell_status:
345                logger.info('The LTE pcell status is on')
346            else:
347                raise CmxError('The LTE pcell cannot be switched on')
348        else:
349            for bts in self.bts:
350                if isinstance(bts, LteBaseStation):
351                    bts.stop()
352                logger.info('The LTE cell status is {} after stop'.format(
353                    bts.is_on()))
354
355    def switch_on_nsa_signalling(self):
356
357        from mrtype.counters import N310
358
359        if self.bts:
360            self.disconnect()
361        logger.info('Switches on NSA signalling')
362
363        # Sets n310 timer to N310.N20 to make endc more stable
364        logger.info('set nr cell n310 timer to N310.N20')
365        self.nr_cell.set_n310(N310.N20)
366        self.bts.append(LteBaseStation(self, self.lte_cell))
367        self.bts.append(NrBaseStation(self, self.nr_cell))
368
369        self.bts[0].start()
370        lte_cell_status = self.bts[0].wait_cell_on(self.cell_switch_on_timer)
371        if lte_cell_status:
372            logger.info('The LTE pcell status is on')
373        else:
374            raise CmxError('The LTE pcell cannot be switched on')
375
376        self.bts[1].start()
377        nr_cell_status = self.bts[1].wait_cell_on(self.cell_switch_on_timer)
378        if nr_cell_status:
379            logger.info('The NR cell status is on')
380        else:
381            raise CmxError('The NR cell cannot be switched on')
382        time.sleep(5)
383
384    def update_lte_cell_config(self, config):
385        """Updates lte cell settings with config."""
386        set_counts = 0
387        for property in LTE_CELL_PROPERTIES:
388            if property in config:
389                setter_name = 'set_' + property
390                setter = getattr(self.lte_cell, setter_name)
391                setter(config[property])
392                set_counts += 1
393        if set_counts < len(config):
394            logger.warning('Not all configs were set in update_cell_config')
395
396    @property
397    def use_carrier_specific(self):
398        """Gets current status of carrier specific duplex configuration."""
399        raise NotImplementedError()
400
401    @use_carrier_specific.setter
402    def use_carrier_specific(self, state):
403        """Sets the carrier specific duplex configuration.
404
405        Args:
406            state: ON/OFF UCS configuration.
407        """
408        raise NotImplementedError()
409
410    def wait_for_rrc_state(self, state, timeout=120):
411        """ Waits until a certain RRC state is set.
412
413        Args:
414            state: the RRC state that is being waited for.
415            timeout: timeout for phone to be in connected state.
416
417        Raises:
418            CmxError on time out.
419        """
420        is_idle = (state.value == 'OFF')
421        for idx in range(timeout):
422            time.sleep(1)
423            if self.dut.state.rrc.is_idle == is_idle:
424                logger.info('{} reached at {} s'.format(state.value, idx))
425                return True
426        error_message = 'Waiting for {} state timeout after {}'.format(
427            state.value, timeout)
428        logger.error(error_message)
429        raise CmxError(error_message)
430
431    def wait_until_attached(self, timeout=120):
432        """Waits until Lte attached.
433
434        Args:
435            timeout: timeout for phone to get attached.
436
437        Raises:
438            CmxError on time out.
439        """
440        try:
441            self.dut.signaling.wait_for_lte_attach(self.lte_cell, timeout)
442        except:
443            raise CmxError(
444                'wait_until_attached timeout after {}'.format(timeout))
445
446    def send_sms(self, message):
447        """ Sends an SMS message to the DUT.
448
449        Args:
450            message: the SMS message to send.
451        """
452        self.dut.signaling.mt_sms(message)
453
454
455class BaseStation(object):
456    """Class to interact with different the base stations."""
457    def __init__(self, cmx, cell):
458        """Init method to setup variables for base station.
459
460        Args:
461            cmx: Controller (Cmx500) object.
462            cell: The cell for the base station.
463        """
464
465        self._cell = cell
466        self._cmx = cmx
467        self._cc = cmx.dut.cc(cell)
468        self._network = cmx.get_network()
469
470    @property
471    def band(self):
472        """Gets the current band of cell.
473
474        Return:
475            the band number in int.
476        """
477        cell_band = self._cell.get_band()
478        return int(cell_band)
479
480    @property
481    def dl_power(self):
482        """Gets RSPRE level.
483
484        Return:
485            the power level in dbm.
486        """
487        return self._cell.get_total_dl_power().in_dBm()
488
489    @property
490    def duplex_mode(self):
491        """Gets current duplex of cell."""
492        band = self._cell.get_band()
493        if band.is_fdd():
494            return DuplexMode.FDD
495        if band.is_tdd():
496            return DuplexMode.TDD
497        if band.is_dl_only():
498            return DuplexMode.DL_ONLY
499
500    def get_cc(self):
501        """Gets component carrier of the cell."""
502        return self._cc
503
504    def is_on(self):
505        """Verifies if the cell is turned on.
506
507            Return:
508                boolean (if the cell is on).
509        """
510        return self._cell.is_on()
511
512    def set_band(self, band):
513        """Sets the Band of cell.
514
515        Args:
516            band: band of cell.
517        """
518        self._cell.set_band(band)
519        logger.info('The band is set to {} and is {} after setting'.format(
520            band, self.band))
521
522    def set_dl_mac_padding(self, state):
523        """Enables/Disables downlink padding at the mac layer.
524
525        Args:
526            state: a boolean
527        """
528        self._cc.set_dl_mac_padding(state)
529
530    def set_dl_power(self, pwlevel):
531        """Modifies RSPRE level.
532
533        Args:
534            pwlevel: power level in dBm.
535        """
536        self._cell.set_total_dl_power(pwlevel)
537
538    def set_ul_power(self, ul_power):
539        """Sets ul power
540
541        Args:
542            ul_power: the uplink power in dbm
543        """
544        self._cc.set_target_ul_power(ul_power)
545
546    def start(self):
547        """Starts the cell."""
548        self._cell.start()
549
550    def stop(self):
551        """Stops the cell."""
552        self._cell.stop()
553
554    def wait_cell_on(self, timeout):
555        """Waits the cell on.
556
557        Args:
558            timeout: the time for waiting the cell on.
559
560        Raises:
561            CmxError on time out.
562        """
563        waiting_time = 0
564        while waiting_time < timeout:
565            if self._cell.is_on():
566                return True
567            waiting_time += 1
568            time.sleep(1)
569        return self._cell.is_on()
570
571
572class LteBaseStation(BaseStation):
573    """ LTE base station."""
574    def __init__(self, cmx, cell):
575        """Init method to setup variables for the LTE base station.
576
577        Args:
578            cmx: Controller (Cmx500) object.
579            cell: The cell for the LTE base station.
580        """
581        from xlapi.lte_cell import LteCell
582        if not isinstance(cell, LteCell):
583            raise CmxError(
584                'The cell is not a LTE cell, LTE base station  fails'
585                ' to create.')
586        super().__init__(cmx, cell)
587
588    def _config_scheduler(self,
589                          dl_mcs=None,
590                          dl_rb_alloc=None,
591                          dl_dci_ncce=None,
592                          dl_dci_format=None,
593                          dl_tm=None,
594                          dl_num_layers=None,
595                          dl_mcs_table=None,
596                          ul_mcs=None,
597                          ul_rb_alloc=None,
598                          ul_dci_ncce=None):
599
600        from rs_mrt.testenvironment.signaling.sri.rat.lte import DciFormat
601        from rs_mrt.testenvironment.signaling.sri.rat.lte import DlTransmissionMode
602        from rs_mrt.testenvironment.signaling.sri.rat.lte import MaxLayersMIMO
603        from rs_mrt.testenvironment.signaling.sri.rat.lte import McsTable
604        from rs_mrt.testenvironment.signaling.sri.rat.lte import PdcchFormat
605
606        log_list = []
607        if dl_mcs:
608            log_list.append('dl_mcs: {}'.format(dl_mcs))
609        if ul_mcs:
610            log_list.append('ul_mcs: {}'.format(ul_mcs))
611        if dl_rb_alloc:
612            log_list.append('dl_rb_alloc: {}'.format(dl_rb_alloc))
613        if ul_rb_alloc:
614            log_list.append('ul_rb_alloc: {}'.format(ul_rb_alloc))
615        if dl_dci_ncce:
616            dl_dci_ncce = PdcchFormat(dl_dci_ncce)
617            log_list.append('dl_dci_ncce: {}'.format(dl_dci_ncce))
618        if ul_dci_ncce:
619            ul_dci_ncce = PdcchFormat(ul_dci_ncce)
620            log_list.append('ul_dci_ncce: {}'.format(ul_dci_ncce))
621        if dl_dci_format:
622            dl_dci_format = DciFormat(dl_dci_format)
623            log_list.append('dl_dci_format: {}'.format(dl_dci_format))
624        if dl_tm:
625            dl_tm = DlTransmissionMode(dl_tm.value)
626            log_list.append('dl_tm: {}'.format(dl_tm))
627        if dl_num_layers:
628            dl_num_layers = MaxLayersMIMO(dl_num_layers)
629            log_list.append('dl_num_layers: {}'.format(dl_num_layers))
630        if dl_mcs_table:
631            dl_mcs_table = McsTable(dl_mcs_table)
632            log_list.append('dl_mcs_table: {}'.format(dl_mcs_table))
633
634        num_crs_antenna_ports = self._cell.get_num_crs_antenna_ports()
635
636        # Sets num of crs antenna ports to 4 for configuring
637        self._cell.set_num_crs_antenna_ports(4)
638        scheduler = self._cmx.dut.get_scheduler(self._cell)
639        logger.info('configure scheduler for {}'.format(','.join(log_list)))
640        scheduler.configure_scheduler(dl_mcs=dl_mcs,
641                                      dl_rb_alloc=dl_rb_alloc,
642                                      dl_dci_ncce=dl_dci_ncce,
643                                      dl_dci_format=dl_dci_format,
644                                      dl_tm=dl_tm,
645                                      dl_num_layers=dl_num_layers,
646                                      dl_mcs_table=dl_mcs_table,
647                                      ul_mcs=ul_mcs,
648                                      ul_rb_alloc=ul_rb_alloc,
649                                      ul_dci_ncce=ul_dci_ncce)
650        logger.info('Configure scheduler succeeds')
651
652        # Sets num of crs antenna ports back to previous value
653        self._cell.set_num_crs_antenna_ports(num_crs_antenna_ports)
654        self._network.apply_changes()
655
656    @property
657    def bandwidth(self):
658        """Get the channel bandwidth of the cell.
659
660        Return:
661            the number rb of the bandwidth.
662        """
663        return self._cell.get_bandwidth().num_rb
664
665    @property
666    def dl_channel(self):
667        """Gets the downlink channel of cell.
668
669        Return:
670            the downlink channel (earfcn) in int.
671        """
672        return int(self._cell.get_dl_earfcn())
673
674    @property
675    def dl_frequency(self):
676        """Get the downlink frequency of the cell."""
677        from mrtype.frequency import Frequency
678        return self._cell.get_dl_earfcn().to_freq().in_units(
679            Frequency.Units.GHz)
680
681    def _to_rb_bandwidth(self, bandwidth):
682        for idx in range(5):
683            if bandwidth < LTE_MHZ_UPPER_BOUND_TO_RB[idx][0]:
684                return LTE_MHZ_UPPER_BOUND_TO_RB[idx][1]
685        return 100
686
687    def disable_all_ul_subframes(self):
688        """Disables all ul subframes for LTE cell."""
689        self._cc.disable_all_ul_subframes()
690        self._network.apply_changes()
691
692    def set_bandwidth(self, bandwidth):
693        """Sets the channel bandwidth of the cell.
694
695        Args:
696            bandwidth: channel bandwidth of cell in MHz.
697        """
698        self._cell.set_bandwidth(self._to_rb_bandwidth(bandwidth))
699        self._network.apply_changes()
700
701    def set_cdrx_config(self):
702        """Sets LTE cdrx config for endc."""
703        from mrtype.lte.drx import (
704            LteDrxConfig,
705            LteDrxInactivityTimer,
706            LteDrxLongCycleStartOffset,
707            LteDrxOnDurationTimer,
708            LteDrxRetransmissionTimer,
709        )
710
711        logger.info('Config Lte drx config')
712        lte_drx_config = LteDrxConfig(
713            on_duration_timer=LteDrxOnDurationTimer.PSF_10,
714            inactivity_timer=LteDrxInactivityTimer.PSF_200,
715            retransmission_timer=LteDrxRetransmissionTimer.PSF_33,
716            long_cycle_start_offset=LteDrxLongCycleStartOffset.ms160(83),
717            short_drx=None)
718        self._cmx.dut.lte_cell_group().set_drx_and_adjust_scheduler(
719            drx_config=lte_drx_config)
720        self._network.apply_changes()
721
722    def set_cell_frequency_band(self, tdd_cfg=None, ssf_cfg=None):
723        """Sets cell frequency band with tdd and ssf config.
724
725        Args:
726            tdd_cfg: the tdd subframe assignment config in number (from 0-6).
727            ssf_cfg: the special subframe pattern config in number (from 1-9).
728        """
729        from rs_mrt.testenvironment.signaling.sri.rat.lte import SpecialSubframePattern
730        from rs_mrt.testenvironment.signaling.sri.rat.lte import SubFrameAssignment
731        from rs_mrt.testenvironment.signaling.sri.rat.lte.config import CellFrequencyBand
732        from rs_mrt.testenvironment.signaling.sri.rat.lte.config import Tdd
733        tdd_subframe = None
734        ssf_pattern = None
735        if tdd_cfg:
736            tdd_subframe = SubFrameAssignment(tdd_cfg + 1)
737        if ssf_cfg:
738            ssf_pattern = SpecialSubframePattern(ssf_cfg)
739        tdd = Tdd(tdd_config=Tdd.TddConfigSignaling(
740            subframe_assignment=tdd_subframe,
741            special_subframe_pattern=ssf_pattern))
742        self._cell.stub.SetCellFrequencyBand(CellFrequencyBand(tdd=tdd))
743        self._network.apply_changes()
744
745    def set_cfi(self, cfi):
746        """Sets number of pdcch symbols (cfi).
747
748        Args:
749            cfi: the value of NumberOfPdcchSymbols
750        """
751        from rs_mrt.testenvironment.signaling.sri.rat.lte import NumberOfPdcchSymbols
752        from rs_mrt.testenvironment.signaling.sri.rat.lte.config import PdcchRegionReq
753
754        logger.info('The cfi enum to set is {}'.format(
755            NumberOfPdcchSymbols(cfi)))
756        req = PdcchRegionReq()
757        req.num_pdcch_symbols = NumberOfPdcchSymbols(cfi)
758        self._cell.stub.SetPdcchControlRegion(req)
759
760    def set_dci_format(self, dci_format):
761        """Selects the downlink control information (DCI) format.
762
763        Args:
764            dci_format: supported dci.
765        """
766        if not isinstance(dci_format, DciFormat):
767            raise CmxError('Wrong type for dci_format')
768        self._config_scheduler(dl_dci_format=dci_format.value)
769
770    def set_dl_channel(self, channel):
771        """Sets the downlink channel number of cell.
772
773        Args:
774            channel: downlink channel number of cell.
775        """
776        if self.dl_channel == channel:
777            logger.info('The dl_channel was at {}'.format(self.dl_channel))
778            return
779        self._cell.set_earfcn(channel)
780        logger.info('The dl_channel was set to {}'.format(self.dl_channel))
781
782    def set_dl_modulation_table(self, modulation):
783        """Sets down link modulation table.
784
785        Args:
786            modulation: modulation table setting (ModulationType).
787        """
788        if not isinstance(modulation, ModulationType):
789            raise CmxError('The modulation is not the type of Modulation')
790        self._config_scheduler(dl_mcs_table=modulation.value)
791
792    def set_mimo_mode(self, mimo):
793        """Sets mimo mode for Lte scenario.
794
795        Args:
796            mimo: the mimo mode.
797        """
798        if not isinstance(mimo, MimoModes):
799            raise CmxError("Wrong type of mimo mode")
800
801        self._cell.set_num_crs_antenna_ports(mimo.value)
802        self._config_scheduler(dl_num_layers=MIMO_MAX_LAYER_MAPPING[mimo])
803
804    def set_scheduling_mode(self,
805                            mcs_dl=None,
806                            mcs_ul=None,
807                            nrb_dl=None,
808                            nrb_ul=None):
809        """Sets scheduling mode.
810
811        Args:
812            scheduling: the new scheduling mode.
813            mcs_dl: Downlink MCS.
814            mcs_ul: Uplink MCS.
815            nrb_dl: Number of RBs for downlink.
816            nrb_ul: Number of RBs for uplink.
817        """
818        self._config_scheduler(dl_mcs=mcs_dl,
819                               ul_mcs=mcs_ul,
820                               dl_rb_alloc=nrb_dl,
821                               ul_rb_alloc=nrb_ul)
822
823    def set_ssf_config(self, ssf_config):
824        """Sets ssf subframe assignment with tdd_config.
825
826        Args:
827            ssf_config: the special subframe pattern config (from 1-9).
828        """
829        self.set_cell_frequency_band(ssf_cfg=ssf_config)
830
831    def set_tdd_config(self, tdd_config):
832        """Sets tdd subframe assignment with tdd_config.
833
834        Args:
835            tdd_config: the subframe assignemnt config (from 0-6).
836        """
837        self.set_cell_frequency_band(tdd_cfg=tdd_config)
838
839    def set_transmission_mode(self, transmission_mode):
840        """Sets transmission mode with schedular.
841
842        Args:
843            transmission_mode: the download link transmission mode.
844        """
845        from rs_mrt.testenvironment.signaling.sri.rat.lte import DlTransmissionMode
846        if not isinstance(transmission_mode, TransmissionModes):
847            raise CmxError('Wrong type of the trasmission mode')
848        dl_tm = DlTransmissionMode(transmission_mode.value)
849        logger.info('set dl tm to {}'.format(dl_tm))
850        self._cc.set_dl_tm(dl_tm)
851        self._network.apply_changes()
852
853    def set_ul_channel(self, channel):
854        """Sets the up link channel number of cell.
855
856        Args:
857            channel: up link channel number of cell.
858        """
859        if self.ul_channel == channel:
860            logger.info('The ul_channel is at {}'.format(self.ul_channel))
861            return
862        self._cell.set_earfcn(channel)
863        logger.info('The dl_channel was set to {}'.format(self.ul_channel))
864
865    @property
866    def ul_channel(self):
867        """Gets the uplink channel of cell.
868
869        Return:
870            the uplink channel (earfcn) in int
871        """
872        return int(self._cell.get_ul_earfcn())
873
874    @property
875    def ul_frequency(self):
876        """Get the uplink frequency of the cell.
877
878        Return:
879            The uplink frequency in GHz.
880        """
881        from mrtype.frequency import Frequency
882        return self._cell.get_ul_earfcn().to_freq().in_units(
883            Frequency.Units.GHz)
884
885    def set_ul_modulation_table(self, modulation):
886        """Sets up link modulation table.
887
888        Args:
889            modulation: modulation table setting (ModulationType).
890        """
891        if not isinstance(modulation, ModulationType):
892            raise CmxError('The modulation is not the type of Modulation')
893        if modulation == ModulationType.Q16:
894            self._cell.stub.SetPuschCommonConfig(False)
895        else:
896            self._cell.stub.SetPuschCommonConfig(True)
897
898
899class NrBaseStation(BaseStation):
900    """ NR base station."""
901    def __init__(self, cmx, cell):
902        """Init method to setup variables for the NR base station.
903
904        Args:
905            cmx: Controller (Cmx500) object.
906            cell: The cell for the NR base station.
907        """
908        from xlapi.nr_cell import NrCell
909        if not isinstance(cell, NrCell):
910            raise CmxError('the cell is not a NR cell, NR base station  fails'
911                           ' to creat.')
912
913        super().__init__(cmx, cell)
914
915    def _config_scheduler(self,
916                          dl_mcs=None,
917                          dl_mcs_table=None,
918                          dl_rb_alloc=None,
919                          dl_mimo_mode=None,
920                          ul_mcs=None,
921                          ul_mcs_table=None,
922                          ul_rb_alloc=None,
923                          ul_mimo_mode=None):
924
925        from rs_mrt.testenvironment.signaling.sri.rat.nr import McsTable
926
927        log_list = []
928        if dl_mcs:
929            log_list.append('dl_mcs: {}'.format(dl_mcs))
930        if ul_mcs:
931            log_list.append('ul_mcs: {}'.format(ul_mcs))
932
933        # If rb alloc is not a tuple, add 0 as start RBs for XLAPI NR scheduler
934        if dl_rb_alloc:
935            if not isinstance(dl_rb_alloc, tuple):
936                dl_rb_alloc = (0, dl_rb_alloc)
937            log_list.append('dl_rb_alloc: {}'.format(dl_rb_alloc))
938        if ul_rb_alloc:
939            if not isinstance(ul_rb_alloc, tuple):
940                ul_rb_alloc = (0, ul_rb_alloc)
941            log_list.append('ul_rb_alloc: {}'.format(ul_rb_alloc))
942        if dl_mcs_table:
943            dl_mcs_table = McsTable(dl_mcs_table)
944            log_list.append('dl_mcs_table: {}'.format(dl_mcs_table))
945        if ul_mcs_table:
946            ul_mcs_table = McsTable(ul_mcs_table)
947            log_list.append('ul_mcs_table: {}'.format(ul_mcs_table))
948        if dl_mimo_mode:
949            log_list.append('dl_mimo_mode: {}'.format(dl_mimo_mode))
950        if ul_mimo_mode:
951            log_list.append('ul_mimo_mode: {}'.format(ul_mimo_mode))
952
953        scheduler = self._cmx.dut.get_scheduler(self._cell)
954        logger.info('configure scheduler for {}'.format(','.join(log_list)))
955
956        scheduler.configure_ue_scheduler(dl_mcs=dl_mcs,
957                                         dl_mcs_table=dl_mcs_table,
958                                         dl_rb_alloc=dl_rb_alloc,
959                                         dl_mimo_mode=dl_mimo_mode,
960                                         ul_mcs=ul_mcs,
961                                         ul_mcs_table=ul_mcs_table,
962                                         ul_rb_alloc=ul_rb_alloc,
963                                         ul_mimo_mode=ul_mimo_mode)
964        logger.info('Configure scheduler succeeds')
965        self._network.apply_changes()
966
967    def attach_as_secondary_cell(self, endc_timer=DEFAULT_ENDC_TIMER):
968        """Enable endc mode for NR cell.
969
970        Args:
971            endc_timer: timeout for endc state
972        """
973        logger.info('enable endc mode for nsa dual connection')
974        self._cmx.dut.signaling.nsa_dual_connect(self._cell)
975        time_count = 0
976        while time_count < endc_timer:
977            if str(self._cmx.dut.state.radio_connectivity) == \
978                    'RadioConnectivityMode.EPS_LTE_NR':
979                logger.info('enter endc mode')
980                return
981            time.sleep(1)
982            time_count += 1
983            if time_count % 30 == 0:
984                logger.info('did not reach endc at {} s'.format(time_count))
985        raise CmxError('Cannot reach endc after {} s'.format(endc_timer))
986
987    def config_flexible_slots(self):
988        """Configs flexible slots for NR cell."""
989
990        from rs_mrt.testenvironment.signaling.sri.rat.nr.config import CellFrequencyBandReq
991        from rs_mrt.testenvironment.signaling.sri.rat.common import SetupRelease
992
993        logger.info('Config flexible slots')
994        req = CellFrequencyBandReq()
995        req.band.frequency_range.fr1.tdd.tdd_config_common.setup_release = SetupRelease.RELEASE
996        self._cell.stub.SetCellFrequencyBand(req)
997        self._network.apply_changes()
998
999    def disable_all_ul_slots(self):
1000        """Disables all ul slots for NR cell"""
1001        self._cc.disable_all_ul_slots()
1002
1003    @property
1004    def dl_channel(self):
1005        """Gets the downlink channel of cell.
1006
1007        Return:
1008            the downlink channel (nr_arfcn) in int.
1009        """
1010        return int(self._cell.get_dl_ref_a())
1011
1012    def _bandwidth_to_carrier_bandwidth(self, bandwidth):
1013        """Converts bandwidth in MHz to CarrierBandwidth.
1014            CarrierBandwidth Enum in XLAPI:
1015                MHZ_5 = 0
1016                MHZ_10 = 1
1017                MHZ_15 = 2
1018                MHZ_20 = 3
1019                MHZ_25 = 4
1020                MHZ_30 = 5
1021                MHZ_40 = 6
1022                MHZ_50 = 7
1023                MHZ_60 = 8
1024                MHZ_70 = 9
1025                MHZ_80 = 10
1026                MHZ_90 = 11
1027                MHZ_100 = 12
1028                MHZ_200 = 13
1029                MHZ_400 = 14
1030        Args:
1031            bandwidth: channel bandwidth in MHz.
1032
1033        Return:
1034            the corresponding NR Carrier Bandwidth.
1035        """
1036        from mrtype.nr.frequency import CarrierBandwidth
1037        if bandwidth > 100:
1038            return CarrierBandwidth(12 + bandwidth // 200)
1039        elif bandwidth > 30:
1040            return CarrierBandwidth(2 + bandwidth // 10)
1041        else:
1042            return CarrierBandwidth(bandwidth // 5 - 1)
1043
1044    def set_bandwidth(self, bandwidth, scs=None):
1045        """Sets the channel bandwidth of the cell.
1046
1047        Args:
1048            bandwidth: channel bandwidth of cell.
1049            scs: subcarrier spacing (SCS) of resource grid 0
1050        """
1051        if not scs:
1052            scs = self._cell.get_scs()
1053        self._cell.set_carrier_bandwidth_and_scs(
1054            self._bandwidth_to_carrier_bandwidth(bandwidth), scs)
1055        logger.info(
1056            'The bandwidth in MHz is {}. After setting, the value is {}'.
1057            format(bandwidth, str(self._cell.get_carrier_bandwidth())))
1058
1059    def set_dl_channel(self, channel):
1060        """Sets the downlink channel number of cell.
1061
1062        Args:
1063            channel: downlink channel number of cell or Frequency Range ('LOW',
1064                     'MID' or 'HIGH').
1065        """
1066        from mrtype.nr.frequency import NrArfcn
1067        from mrtype.frequency import FrequencyRange
1068
1069        # When the channel is string, set it use Frenquency Range
1070        if isinstance(channel, str):
1071            logger.info('Sets dl channel Frequency Range {}'.format(channel))
1072            frequency_range = FrequencyRange.LOW
1073            if channel.upper() == 'MID':
1074                frequency_range = FrequencyRange.MID
1075            elif channel.upper() == 'HIGH':
1076                frequency_range = FrequencyRange.HIGH
1077            self._cell.set_dl_ref_a_offset(self.band, frequency_range)
1078            logger.info('The dl_channel was set to {}'.format(self.dl_channel))
1079            return
1080        if self.dl_channel == channel:
1081            logger.info('The dl_channel was at {}'.format(self.dl_channel))
1082            return
1083        self._cell.set_dl_ref_a_offset(self.band, NrArfcn(channel))
1084        logger.info('The dl_channel was set to {}'.format(self.dl_channel))
1085
1086    def set_dl_modulation_table(self, modulation):
1087        """Sets down link modulation table.
1088
1089        Args:
1090            modulation: modulation table setting (ModulationType).
1091        """
1092        if not isinstance(modulation, ModulationType):
1093            raise CmxError('The modulation is not the type of Modulation')
1094        self._config_scheduler(dl_mcs_table=modulation.value)
1095
1096    def set_mimo_mode(self, mimo):
1097        """Sets mimo mode for NR nsa scenario.
1098
1099        Args:
1100            mimo: the mimo mode.
1101        """
1102        from rs_mrt.testenvironment.signaling.sri.rat.nr import DownlinkMimoMode
1103        if not isinstance(mimo, MimoModes):
1104            raise CmxError("Wrong type of mimo mode")
1105
1106        self._config_scheduler(dl_mimo_mode=DownlinkMimoMode.Enum(mimo.value))
1107
1108    def set_scheduling_mode(self,
1109                            mcs_dl=None,
1110                            mcs_ul=None,
1111                            nrb_dl=None,
1112                            nrb_ul=None):
1113        """Sets scheduling mode.
1114
1115        Args:
1116            mcs_dl: Downlink MCS.
1117            mcs_ul: Uplink MCS.
1118            nrb_dl: Number of RBs for downlink.
1119            nrb_ul: Number of RBs for uplink.
1120        """
1121        self._config_scheduler(dl_mcs=mcs_dl,
1122                               ul_mcs=mcs_ul,
1123                               dl_rb_alloc=nrb_dl,
1124                               ul_rb_alloc=nrb_ul)
1125
1126    def set_ssf_config(self, ssf_config):
1127        """Sets ssf subframe assignment with tdd_config.
1128
1129        Args:
1130            ssf_config: the special subframe pattern config (from 1-9).
1131        """
1132        raise CmxError('the set ssf config for nr did not implemente yet')
1133
1134    def set_tdd_config(self, tdd_config):
1135        """Sets tdd subframe assignment with tdd_config.
1136
1137        Args:
1138            tdd_config: the subframe assignemnt config (from 0-6).
1139        """
1140        raise CmxError('the set tdd config for nr did not implemente yet')
1141
1142    def set_transmission_mode(self, transmission_mode):
1143        """Sets transmission mode with schedular.
1144
1145        Args:
1146            transmission_mode: the download link transmission mode.
1147        """
1148        logger.info('The set transmission mode for nr is set by mimo mode')
1149
1150    def set_ul_modulation_table(self, modulation):
1151        """Sets down link modulation table.
1152
1153        Args:
1154            modulation: modulation table setting (ModulationType).
1155        """
1156        if not isinstance(modulation, ModulationType):
1157            raise CmxError('The modulation is not the type of Modulation')
1158        self._config_scheduler(ul_mcs_table=modulation.value)
1159
1160
1161class CmxError(Exception):
1162    """Class to raise exceptions related to cmx."""
1163