• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2021 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import time
18from enum import Enum
19
20from acts.controllers.rohdeschwarz_lib import cmx500
21from acts.controllers.rohdeschwarz_lib.cmx500 import LteBandwidth
22from acts.controllers.rohdeschwarz_lib.cmx500 import LteState
23from acts.controllers import cellular_simulator as cc
24from acts.controllers.cellular_lib import LteSimulation
25
26CMX_TM_MAPPING = {
27    LteSimulation.TransmissionMode.TM1: cmx500.TransmissionModes.TM1,
28    LteSimulation.TransmissionMode.TM2: cmx500.TransmissionModes.TM2,
29    LteSimulation.TransmissionMode.TM3: cmx500.TransmissionModes.TM3,
30    LteSimulation.TransmissionMode.TM4: cmx500.TransmissionModes.TM4,
31    LteSimulation.TransmissionMode.TM7: cmx500.TransmissionModes.TM7,
32    LteSimulation.TransmissionMode.TM8: cmx500.TransmissionModes.TM8,
33    LteSimulation.TransmissionMode.TM9: cmx500.TransmissionModes.TM9,
34}
35
36CMX_SCH_MAPPING = {
37    LteSimulation.SchedulingMode.STATIC: cmx500.SchedulingMode.USERDEFINEDCH
38}
39
40CMX_MIMO_MAPPING = {
41    LteSimulation.MimoMode.MIMO_1x1: cmx500.MimoModes.MIMO1x1,
42    LteSimulation.MimoMode.MIMO_2x2: cmx500.MimoModes.MIMO2x2,
43    LteSimulation.MimoMode.MIMO_4x4: cmx500.MimoModes.MIMO4x4,
44}
45
46
47class ConfigurationMode(Enum):
48    Power = "Power"
49
50
51class CMX500CellularSimulator(cc.AbstractCellularSimulator):
52    """ A cellular simulator for telephony simulations based on the CMX 500
53    controller. """
54
55    # The maximum power that the equipment is able to transmit
56    MAX_DL_POWER = -25
57
58    def __init__(self,
59                 ip_address,
60                 port='5025',
61                 config_mode=ConfigurationMode.Power):
62        """ Initializes the cellular simulator.
63
64        Args:
65            ip_address: the ip address of the CMX500
66            port: the port number for the CMX500 controller
67            config_mode: A pre-defined configuration mode to use.
68        """
69        super().__init__()
70        try:
71            self.cmx = cmx500.Cmx500(ip_address, port)
72        except:
73            raise cc.CellularSimulatorError('Error when Initializes CMX500.')
74
75        self._config_mode = config_mode
76        self.bts = self.cmx.bts
77
78    def destroy(self):
79        """ Sends finalization commands to the cellular equipment and closes
80        the connection. """
81        self.log.info('destroy the cmx500 simulator')
82        self.cmx.disconnect()
83
84    def setup_lte_scenario(self):
85        """ Configures the equipment for an LTE simulation. """
86        self.log.info('setup lte scenario')
87        self.cmx.switch_lte_signalling(cmx500.LteState.LTE_ON)
88
89    def setup_nr_sa_scenario(self):
90        """ Configures the equipment for an NR stand alone simulation. """
91        raise NotImplementedError()
92
93    def setup_nr_nsa_scenario(self):
94        """ Configures the equipment for an NR non stand alone simulation. """
95        self.log.info('setup nsa scenario (start lte cell and nr cell')
96        self.cmx.switch_on_nsa_signalling()
97
98    def set_band_combination(self, bands, mimo_modes):
99        """ Prepares the test equipment for the indicated band/mimo combination.
100
101        Args:
102            bands: a list of bands represented as ints or strings
103            mimo_modes: a list of LteSimulation.MimoMode to use for each carrier
104        """
105        self.num_carriers = len(bands)
106
107    def set_lte_rrc_state_change_timer(self, enabled, time=10):
108        """ Configures the LTE RRC state change timer.
109
110        Args:
111            enabled: a boolean indicating if the timer should be on or off.
112            time: time in seconds for the timer to expire
113        """
114        self.log.info('set timer enabled to {} and the time to {}'.format(
115            enabled, time))
116        self.cmx.rrc_state_change_time_enable = enabled
117        self.cmx.lte_rrc_state_change_timer = time
118
119    def set_band(self, bts_index, band):
120        """ Sets the band for the indicated base station.
121
122        Args:
123            bts_index: the base station number
124            band: the new band
125        """
126        self.log.info('set band to {}'.format(band))
127        self.bts[bts_index].set_band(int(band))
128
129    def get_duplex_mode(self, band):
130        """ Determines if the band uses FDD or TDD duplex mode
131
132        Args:
133            band: a band number
134
135        Returns:
136            an variable of class DuplexMode indicating if band is FDD or TDD
137        """
138        if 33 <= int(band) <= 46:
139            return cmx500.DuplexMode.TDD
140        else:
141            return cmx500.DuplexMode.FDD
142
143    def set_input_power(self, bts_index, input_power):
144        """ Sets the input power for the indicated base station.
145
146        Args:
147            bts_index: the base station number
148            input_power: the new input power
149        """
150        if input_power > 23:
151            self.log.warning('Open loop supports -50dBm to 23 dBm. '
152                             'Setting it to max power 23 dBm')
153            input_power = 23
154        self.log.info('set input power to {}'.format(input_power))
155        self.bts[bts_index].set_ul_power(input_power)
156
157    def set_output_power(self, bts_index, output_power):
158        """ Sets the output power for the indicated base station.
159
160        Args:
161            bts_index: the base station number
162            output_power: the new output power
163        """
164        self.log.info('set output power to {}'.format(output_power))
165        self.bts[bts_index].set_dl_power(output_power)
166
167    def set_tdd_config(self, bts_index, tdd_config):
168        """ Sets the tdd configuration number for the indicated base station.
169
170        Args:
171            bts_index: the base station number
172            tdd_config: the new tdd configuration number (from 0 to 6)
173        """
174        self.log.info('set tdd config to {}'.format(tdd_config))
175        self.bts[bts_index].set_tdd_config(tdd_config)
176
177    def set_ssf_config(self, bts_index, ssf_config):
178        """ Sets the Special Sub-Frame config number for the indicated
179        base station.
180
181        Args:
182            bts_index: the base station number
183            ssf_config: the new ssf config number (from 0 to 9)
184        """
185        self.log.info('set ssf config to {}'.format(ssf_config))
186        self.bts[bts_index].set_ssf_config(ssf_config)
187
188    def set_bandwidth(self, bts_index, bandwidth):
189        """ Sets the bandwidth for the indicated base station.
190
191        Args:
192            bts_index: the base station number
193            bandwidth: the new bandwidth in MHz
194        """
195        self.log.info('set bandwidth of bts {} to {}'.format(
196            bts_index, bandwidth))
197        self.bts[bts_index].set_bandwidth(int(bandwidth))
198
199    def set_downlink_channel_number(self, bts_index, channel_number):
200        """ Sets the downlink channel number for the indicated base station.
201
202        Args:
203            bts_index: the base station number
204            channel_number: the new channel number (earfcn)
205        """
206        self.log.info(
207            'Sets the downlink channel number to {}'.format(channel_number))
208        self.bts[bts_index].set_dl_channel(channel_number)
209
210    def set_mimo_mode(self, bts_index, mimo_mode):
211        """ Sets the mimo mode for the indicated base station.
212
213        Args:
214            bts_index: the base station number
215            mimo_mode: the new mimo mode
216        """
217        self.log.info('set mimo mode to {}'.format(mimo_mode))
218        mimo_mode = CMX_MIMO_MAPPING[mimo_mode]
219        self.bts[bts_index].set_mimo_mode(mimo_mode)
220
221    def set_transmission_mode(self, bts_index, tmode):
222        """ Sets the transmission mode for the indicated base station.
223
224        Args:
225            bts_index: the base station number
226            tmode: the new transmission mode
227        """
228        self.log.info('set TransmissionMode to {}'.format(tmode))
229        tmode = CMX_TM_MAPPING[tmode]
230        self.bts[bts_index].set_transmission_mode(tmode)
231
232    def set_scheduling_mode(self,
233                            bts_index,
234                            scheduling,
235                            mcs_dl=None,
236                            mcs_ul=None,
237                            nrb_dl=None,
238                            nrb_ul=None):
239        """ Sets the scheduling mode for the indicated base station.
240
241        Args:
242            bts_index: the base station number.
243            scheduling: the new scheduling mode.
244            mcs_dl: Downlink MCS.
245            mcs_ul: Uplink MCS.
246            nrb_dl: Number of RBs for downlink.
247            nrb_ul: Number of RBs for uplink.
248        """
249        if scheduling not in CMX_SCH_MAPPING:
250            raise cc.CellularSimulatorError(
251                "This scheduling mode is not supported")
252        log_list = []
253        if mcs_dl:
254            log_list.append('mcs_dl: {}'.format(mcs_dl))
255        if mcs_ul:
256            log_list.append('mcs_ul: {}'.format(mcs_ul))
257        if nrb_dl:
258            log_list.append('nrb_dl: {}'.format(nrb_dl))
259        if nrb_ul:
260            log_list.append('nrb_ul: {}'.format(nrb_ul))
261
262        self.log.info('set scheduling mode to {}'.format(','.join(log_list)))
263        self.bts[bts_index].set_scheduling_mode(mcs_dl=mcs_dl,
264                                                mcs_ul=mcs_ul,
265                                                nrb_dl=nrb_dl,
266                                                nrb_ul=nrb_ul)
267
268    def set_dl_256_qam_enabled(self, bts_index, enabled):
269        """ Determines what MCS table should be used for the downlink.
270
271        Args:
272            bts_index: the base station number
273            enabled: whether 256 QAM should be used
274        """
275        self.log.info('Set 256 QAM DL MCS enabled: ' + str(enabled))
276        self.bts[bts_index].set_dl_modulation_table(
277            cmx500.ModulationType.Q256 if enabled else cmx500.ModulationType.
278            Q64)
279
280    def set_ul_64_qam_enabled(self, bts_index, enabled):
281        """ Determines what MCS table should be used for the uplink.
282
283        Args:
284            bts_index: the base station number
285            enabled: whether 64 QAM should be used
286        """
287        self.log.info('Set 64 QAM UL MCS enabled: ' + str(enabled))
288        self.bts[bts_index].set_ul_modulation_table(
289            cmx500.ModulationType.Q64 if enabled else cmx500.ModulationType.Q16
290        )
291
292    def set_mac_padding(self, bts_index, mac_padding):
293        """ Enables or disables MAC padding in the indicated base station.
294
295        Args:
296            bts_index: the base station number
297            mac_padding: the new MAC padding setting
298        """
299        self.log.info('set mac pad on {}'.format(mac_padding))
300        self.bts[bts_index].set_dl_mac_padding(mac_padding)
301
302    def set_cfi(self, bts_index, cfi):
303        """ Sets the Channel Format Indicator for the indicated base station.
304
305        Args:
306            bts_index: the base station number
307            cfi: the new CFI setting
308        """
309        if cfi == 'BESTEFFORT':
310            self.log.info('The cfi is BESTEFFORT, use default value')
311            return
312        try:
313            index = int(cfi) + 1
314        except Exception as e:
315            index = 1
316        finally:
317            self.log.info('set the cfi and the cfi index is {}'.format(index))
318            self.bts[bts_index].set_cfi(index)
319
320    def set_paging_cycle(self, bts_index, cycle_duration):
321        """ Sets the paging cycle duration for the indicated base station.
322
323        Args:
324            bts_index: the base station number
325            cycle_duration: the new paging cycle duration in milliseconds
326        """
327        self.log.warning('The set_paging_cycle method is not implememted, '
328                         'use default value')
329
330    def set_phich_resource(self, bts_index, phich):
331        """ Sets the PHICH Resource setting for the indicated base station.
332
333        Args:
334            bts_index: the base station number
335            phich: the new PHICH resource setting
336        """
337        self.log.warning('The set_phich_resource method is not implememted, '
338                         'use default value')
339
340    def lte_attach_secondary_carriers(self, ue_capability_enquiry):
341        """ Activates the secondary carriers for CA. Requires the DUT to be
342        attached to the primary carrier first.
343
344        Args:
345            ue_capability_enquiry: UE capability enquiry message to be sent to
346        the UE before starting carrier aggregation.
347        """
348        self.wait_until_communication_state()
349        self.bts[1].attach_as_secondary_cell()
350        time.sleep(10)
351
352        if self._config_mode and self._config_mode == ConfigurationMode.Power:
353            self.configure_for_power_measurement()
354
355        self.log.info('The radio connectivity is {}'.format(
356            self.cmx.dut.state.radio_connectivity))
357
358    def configure_for_power_measurement(self):
359        """ Applies a pre-defined configuration for PDCCH power testing."""
360        self.log.info('set lte cdrx for nr nsa scenario')
361        self.bts[0].set_cdrx_config()
362        time.sleep(5)
363
364        self.log.info('Disables mac padding')
365        self.bts[0].set_dl_mac_padding(False)
366        self.bts[1].set_dl_mac_padding(False)
367        time.sleep(5)
368
369        self.log.info('configure flexible slots and wait for 5 seconds')
370        self.bts[1].config_flexible_slots()
371        time.sleep(5)
372
373        self.log.info('disable all ul subframes of the lte cell')
374        self.bts[0].disable_all_ul_subframes()
375        time.sleep(30)
376
377        self.log.info('Disables Nr UL slots')
378        self.bts[1].disable_all_ul_slots()
379        time.sleep(5)
380
381    def wait_until_attached(self, timeout=120):
382        """ Waits until the DUT is attached to the primary carrier.
383
384        Args:
385            timeout: after this amount of time the method will raise a
386                CellularSimulatorError exception. Default is 120 seconds.
387        """
388        self.log.info('wait until attached')
389        self.cmx.wait_until_attached(timeout)
390
391    def wait_until_communication_state(self, timeout=120):
392        """ Waits until the DUT is in Communication state.
393
394        Args:
395            timeout: after this amount of time the method will raise a
396                CellularSimulatorError exception. Default is 120 seconds.
397        Return:
398            True if cmx reach rrc state within timeout
399        Raise:
400            CmxError if tiemout
401        """
402        self.log.info('wait for rrc on state')
403        return self.cmx.wait_for_rrc_state(cmx500.RrcState.RRC_ON, timeout)
404
405    def wait_until_idle_state(self, timeout=120):
406        """ Waits until the DUT is in Idle state.
407
408        Args:
409            timeout: after this amount of time the method will raise a
410                CellularSimulatorError exception. Default is 120 seconds.
411        Return:
412            True if cmx reach rrc state within timeout
413        Raise:
414            CmxError if tiemout
415        """
416        self.log.info('wait for rrc off state')
417        return self.cmx.wait_for_rrc_state(cmx500.RrcState.RRC_OFF, timeout)
418
419    def wait_until_quiet(self, timeout=120):
420        """Waits for all pending operations to finish on the simulator.
421
422        Args:
423            timeout: after this amount of time the method will raise a
424                CellularSimulatorError exception. Default is 120 seconds.
425        """
426        self.cmx._network.apply_changes()
427
428    def detach(self):
429        """ Turns off all the base stations so the DUT loose connection."""
430        self.log.info('Bypass simulator detach step for now')
431
432    def stop(self):
433        """ Stops current simulation. After calling this method, the simulator
434        will need to be set up again. """
435        self.log.info('Stops current simulation and disconnect cmx500')
436        self.cmx.disconnect()
437
438    def start_data_traffic(self):
439        """ Starts transmitting data from the instrument to the DUT. """
440        self.log.warning('The start_data_traffic is not implemented yet')
441
442    def stop_data_traffic(self):
443        """ Stops transmitting data from the instrument to the DUT. """
444        self.log.warning('The stop_data_traffic is not implemented yet')
445
446    def send_sms(self, message):
447        """ Sends an SMS message to the DUT.
448
449        Args:
450            message: the SMS message to send.
451        """
452        self.cmx.send_sms(message)
453