#!/usr/bin/env python3 # # Copyright 2019 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import time from enum import Enum from acts.controllers import abstract_inst LTE_ATTACH_RESP = 'ATT' LTE_CONN_RESP = 'CONN' LTE_IDLE_RESP = 'IDLE' LTE_PSWITCHED_ON_RESP = 'ON' LTE_PSWITCHED_OFF_RESP = 'OFF' WCDMA_ATTACH_RESP = 'ATT' WCDMA_CESTABLISHED_RESP = 'CEST' WCDMA_PSWITCHED_ON_RESP = 'ON' STATE_CHANGE_TIMEOUT = 20 class IPAddressType(Enum): """ IP Address types""" IPV4 = "IPV4" IPV6 = "IPV6" IPV4V6 = "IPV46" class SignallingState(Enum): """Signalling states common to all RATs.""" ON = 'ON' OFF = 'OFF' ReadyForHandover = 'RFH' class SccActivationMode(Enum): """Activation mode to use for SCCs.""" AUTO = 'AUTO' MANUAL = 'MAN' SEMI_AUTO = 'SEM' class SccState(Enum): """Secondary component carrier states.""" ON = 'ON' OFF = 'OFF' RRC = 'RRC' MAC = 'MAC' class LteState(Enum): """LTE ON and OFF""" LTE_ON = 'ON' LTE_OFF = 'OFF' class BtsNumber(Enum): """Base station Identifiers.""" BTS1 = 'PCC' BTS2 = 'SCC1' BTS3 = 'SCC2' BTS4 = 'SCC3' BTS5 = 'SCC4' BTS6 = 'SCC6' BTS7 = 'SCC7' class LteBandwidth(Enum): """Supported LTE bandwidths.""" BANDWIDTH_1MHz = 'B014' BANDWIDTH_3MHz = 'B030' BANDWIDTH_5MHz = 'B050' BANDWIDTH_10MHz = 'B100' BANDWIDTH_15MHz = 'B150' BANDWIDTH_20MHz = 'B200' def BandwidthFromFloat(bw): if bw == 20: return LteBandwidth.BANDWIDTH_20MHz elif bw == 15: return LteBandwidth.BANDWIDTH_15MHz elif bw == 10: return LteBandwidth.BANDWIDTH_10MHz elif bw == 5: return LteBandwidth.BANDWIDTH_5MHz elif bw == 3: return LteBandwidth.BANDWIDTH_3MHz elif bw == 1.4: return LteBandwidth.BANDWIDTH_1MHz raise ValueError('Bandwidth {} MHz is not valid for LTE'.format(bandwidth)) class DrxMode(Enum): """DRX Modes.""" DRXS = 'DRXS' DRXL = 'DRXL' USER_DEFINED = 'UDEF' ON = 'ON' OFF = 'OFF' class DuplexMode(Enum): """Duplex Modes""" FDD = 'FDD' TDD = 'TDD' class SchedulingMode(Enum): """Supported scheduling modes.""" RMC = 'RMC' USERDEFINEDCH = 'UDCHannels' class TransmissionModes(Enum): """Supported transmission modes.""" TM1 = 'TM1' TM2 = 'TM2' TM3 = 'TM3' TM4 = 'TM4' TM7 = 'TM7' TM8 = 'TM8' TM9 = 'TM9' class UseCarrierSpecific(Enum): """Enable or disable carrier specific.""" UCS_ON = 'ON' UCS_OFF = 'OFF' class RbPosition(Enum): """Supported RB positions.""" LOW = 'LOW' HIGH = 'HIGH' P5 = 'P5' P10 = 'P10' P23 = 'P23' P35 = 'P35' P48 = 'P48' class ModulationType(Enum): """Supported Modulation Types.""" QPSK = 'QPSK' Q16 = 'Q16' Q64 = 'Q64' Q256 = 'Q256' class DciFormat(Enum): """Support DCI Formats for MIMOs""" D1 = 'D1' D1A = 'D1A' D1B = 'D1B' D2 = 'D2' D2A = 'D2A' D2B = 'D2B' D2C = 'D2C' class MimoModes(Enum): """MIMO Modes dl antennas""" MIMO1x1 = 'ONE' MIMO2x2 = 'TWO' MIMO4x4 = 'FOUR' class MimoScenario(Enum): """Supported mimo scenarios""" SCEN1x1 = 'SCELl:FLEXible SUA1,RF1C,RX1,RF1C,TX1' SCEN2x2 = 'TRO:FLEXible SUA1,RF1C,RX1,RF1C,TX1,RF3C,TX2' SCEN4x4 = 'FRO FLEXible SUA1,RF1C,RX1,RF1C,TX1,RF3C,TX2,RF2C,TX3,RF4C,TX4' class RrcState(Enum): """States to enable/disable rrc.""" RRC_ON = 'ON' RRC_OFF = 'OFF' class MacPadding(Enum): """Enables/Disables Mac Padding.""" ON = 'ON' OFF = 'OFF' class ConnectionType(Enum): """Supported Connection Types.""" TEST = 'TESTmode' DAU = 'DAPPlication' class RepetitionMode(Enum): """Specifies LTE Measurement Repetition Mode.""" SINGLESHOT = 'SINGleshot' CONTINUOUS = 'CONTinuous' class TpcPowerControl(Enum): """Specifies Up Link power control types.""" MIN_POWER = 'MINPower' MAX_POWER = 'MAXPower' CONSTANT = 'CONStant' SINGLE = 'SINGle' UDSINGLE = 'UDSingle' UDCONTINUOUS = 'UDContinuous' ALTERNATE = 'ALT0' CLOSED_LOOP = 'CLOop' RP_CONTROL = 'RPControl' FLEX_POWER = 'FULPower' class ReducedPdcch(Enum): """Enables/disables the reduction of PDCCH resources.""" ON = 'ON' OFF = 'OFF' class Cmw500(abstract_inst.SocketInstrument): def __init__(self, ip_addr, port): """Init method to setup variables for controllers. Args: ip_addr: Controller's ip address. port: Port """ super(Cmw500, self).__init__(ip_addr, port) self._connect_socket() self._send('*CLS') self._send('*ESE 0;*SRE 0') self._send('*CLS') self._send('*ESE 1;*SRE 4') self._send('SYST:DISP:UPD ON') def switch_lte_signalling(self, state): """ Turns LTE signalling ON/OFF. Args: state: an instance of LteState indicating the state to which LTE signal has to be set. """ if not isinstance(state, LteState): raise ValueError('state should be the instance of LteState.') state = state.value cmd = 'SOURce:LTE:SIGN:CELL:STATe {}'.format(state) self.send_and_recv(cmd) time_elapsed = 0 while time_elapsed < STATE_CHANGE_TIMEOUT: response = self.send_and_recv('SOURce:LTE:SIGN:CELL:STATe:ALL?') if response == state + ',ADJ': self._logger.info('LTE signalling is now {}.'.format(state)) break # Wait for a second and increase time count by one time.sleep(1) time_elapsed += 1 else: raise CmwError('Failed to turn {} LTE signalling.'.format(state)) def switch_scc_state(self, scc_index, state): """ Changes the SCC to the requested state. Args: scc_index: the SCC number to modify. state: an instance of SccState indicating which SCC state to set. """ cmd = 'CALL:LTE:SIGN:SCC{}:ACTion {}'.format(scc_index, state.value) self.send_and_recv(cmd) self.wait_for_scc_state(scc_index, [state]) def wait_for_scc_state(self, scc_index, allowed, timeout=120): """ Polls for a SCC to reach the requested state. Args: scc_index: an integer defining the scc number. allowed: a list of SccStates defining the allowed states. timeout: the maximum amount of time to wait for the requested state. """ self.wait_for_response( 'FETCh:LTE:SIGN:SCC{}:STATe?'.format(scc_index), [a.value for a in allowed], timeout, ) def wait_for_response(self, cmd, allowed, timeout=120): """Polls a specific query command until an allowed response is returned. Args: cmd: the query command string. allowed: a collection of allowed string responses. timeout: the maximum amount of time to wait for an allowed response. """ time_elapsed = 0 while time_elapsed < timeout: response = self.send_and_recv(cmd) if response in allowed: return time.sleep(1) time_elapsed += 1 raise CmwError('Failed to wait for valid response.') def enable_packet_switching(self): """Enable packet switching in call box.""" self.send_and_recv('CALL:LTE:SIGN:PSWitched:ACTion CONNect') self.wait_for_pswitched_state() def disable_packet_switching(self): """Disable packet switching in call box.""" self.send_and_recv('CALL:LTE:SIGN:PSWitched:ACTion DISConnect') self.wait_for_pswitched_state() @property def use_carrier_specific(self): """Gets current status of carrier specific duplex configuration.""" return self.send_and_recv('CONFigure:LTE:SIGN:DMODe:UCSPECific?') @use_carrier_specific.setter def use_carrier_specific(self, state): """Sets the carrier specific duplex configuration. Args: state: ON/OFF UCS configuration. """ cmd = 'CONFigure:LTE:SIGN:DMODe:UCSPECific {}'.format(state) self.send_and_recv(cmd) def send_and_recv(self, cmd): """Send and recv the status of the command. Args: cmd: Command to send. Returns: status: returns the status of the command sent. """ self._send(cmd) if '?' in cmd: status = self._recv() return status def configure_mimo_settings(self, mimo): """Sets the mimo scenario for the test. Args: mimo: mimo scenario to set. """ cmd = 'ROUTe:LTE:SIGN:SCENario:{}'.format(mimo.value) self.send_and_recv(cmd) def wait_for_pswitched_state(self, timeout=10): """Wait until pswitched state. Args: timeout: timeout for lte pswitched state. Raises: CmwError on timeout. """ while timeout > 0: state = self.send_and_recv('FETCh:LTE:SIGN:PSWitched:STATe?') if state == LTE_PSWITCHED_ON_RESP: self._logger.debug('Connection to setup initiated.') break elif state == LTE_PSWITCHED_OFF_RESP: self._logger.debug('Connection to setup detached.') break # Wait for a second and decrease count by one time.sleep(1) timeout -= 1 else: raise CmwError('Failure in setting up/detaching connection') def wait_for_attached_state(self, timeout=120): """Attach the controller with device. Args: timeout: timeout for phone to get attached. Raises: CmwError on time out. """ while timeout > 0: state = self.send_and_recv('FETCh:LTE:SIGN:PSWitched:STATe?') if state == LTE_ATTACH_RESP: self._logger.debug('Call box attached with device') break # Wait for a second and decrease count by one time.sleep(1) timeout -= 1 else: raise CmwError('Device could not be attached') def wait_for_rrc_state(self, state, timeout=120): """ Waits until a certain RRC state is set. Args: state: the RRC state that is being waited for. timeout: timeout for phone to be in connected state. Raises: CmwError on time out. """ if state not in [LTE_CONN_RESP, LTE_IDLE_RESP]: raise ValueError( 'The allowed values for state are {} and {}.'.format( LTE_CONN_RESP, LTE_IDLE_RESP)) while timeout > 0: new_state = self.send_and_recv('SENSe:LTE:SIGN:RRCState?') if new_state == state: self._logger.debug('The RRC state is {}.'.format(new_state)) break # Wait for a second and decrease count by one time.sleep(1) timeout -= 1 else: raise CmwError('Timeout before RRC state was {}.'.format(state)) def stop_all_signalling(self): """Turns off all signaling applications, generators, or measurements.""" self.send_and_recv('SYSTem:SIGNaling:ALL:OFF') self.wait_until_quiet() def wait_until_quiet(self): """Waits for all pending operations to stop on the callbox.""" self.send_and_recv('*OPC?') def reset(self): """System level reset""" self.send_and_recv('*RST; *OPC') @property def get_instrument_id(self): """Gets instrument identification number""" return self.send_and_recv('*IDN?') def disconnect(self): """Disconnect controller from device and switch to local mode.""" self.switch_lte_signalling(LteState.LTE_OFF) self.close_remote_mode() self._close_socket() def close_remote_mode(self): """Exits remote mode to local mode.""" self.send_and_recv('>L') def detach(self): """Detach callbox and controller.""" self.send_and_recv('CALL:LTE:SIGN:PSWitched:ACTion DETach') @property def rrc_connection(self): """Gets the RRC connection state.""" return self.send_and_recv('CONFigure:LTE:SIGN:CONNection:KRRC?') @rrc_connection.setter def rrc_connection(self, state): """Selects whether the RRC connection is kept or released after attach. Args: mode: RRC State ON/OFF. """ if not isinstance(state, RrcState): raise ValueError('state should be the instance of RrcState.') cmd = 'CONFigure:LTE:SIGN:CONNection:KRRC {}'.format(state.value) self.send_and_recv(cmd) @property def rrc_connection_timer(self): """Gets the inactivity timeout for disabled rrc connection.""" return self.send_and_recv('CONFigure:LTE:SIGN:CONNection:RITimer?') @rrc_connection_timer.setter def rrc_connection_timer(self, time_in_secs): """Sets the inactivity timeout for disabled rrc connection. By default the timeout is set to 5. Args: time_in_secs: timeout of inactivity in rrc connection. """ cmd = 'CONFigure:LTE:SIGN:CONNection:RITimer {}'.format(time_in_secs) self.send_and_recv(cmd) @property def scc_activation_mode(self): """Gets the activation mode to use for SCCs when establishing a connection. """ return self.send_and_recv('CONFigure:LTE:SIGN:SCC:AMODe?') @scc_activation_mode.setter def scc_activation_mode(self, activation_mode): """Sets the activation mode to use with SCCs when establishing a connection. Args: activation_mode: the scc activation mode to use. """ if not isinstance(activation_mode, SccActivationMode): raise ValueError('state should be the instance of RrcState.') cmd = 'CONFigure:LTE:SIGN:SCC:AMODe {}'.format(activation_mode.value) self.send_and_recv(cmd) @property def dl_mac_padding(self): """Gets the state of mac padding.""" return self.send_and_recv('CONFigure:LTE:SIGN:CONNection:DLPadding?') @dl_mac_padding.setter def dl_mac_padding(self, state): """Enables/Disables downlink padding at the mac layer. Args: state: ON/OFF """ cmd = 'CONFigure:LTE:SIGN:CONNection:DLPadding {}'.format(state.value) self.send_and_recv(cmd) @property def connection_type(self): """Gets the connection type applied in callbox.""" return self.send_and_recv('CONFigure:LTE:SIGN:CONNection:CTYPe?') @connection_type.setter def connection_type(self, ctype): """Sets the connection type to be applied. Args: ctype: Connection type. """ cmd = 'CONFigure:LTE:SIGN:CONNection:CTYPe {}'.format(ctype.value) self.send_and_recv(cmd) @property def drx_connected_mode(self): """ Gets the Connected DRX LTE cell parameter Args: None Returns: DRX connected mode (ON, OFF, USER_DEFINED, DRX_S, DRX_L) """ cmd = 'CONFigure:LTE:SIGN:CONNection:CDRX:ENABle?' return self.send_and_recv(cmd) @drx_connected_mode.setter def drx_connected_mode(self, mode): """ Sets the Connected DRX LTE cell parameter Args: mode: DRX mode Returns: None """ if not isinstance(mode, DrxMode): raise ValueError('state should be the instance of DrxMode.') cmd = 'CONFigure:LTE:SIGN:CONNection:CDRX:ENABle {}'.format(mode.value) self.send_and_recv(cmd) @property def drx_on_duration_timer(self): """ Gets the amount of PDCCH subframes to wait for data after waking up from a DRX cycle Args: None Returns: DRX mode duration timer """ cmd = 'CONFigure:LTE:SIGN:CONNection:CDRX:ODTimer?' return self.send_and_recv(cmd) @drx_on_duration_timer.setter def drx_on_duration_timer(self, time): """ Sets the amount of PDCCH subframes to wait for data after waking up from a DRX cycle Args: timer: Length of interval to wait for user data to be transmitted Returns: None """ cmd = 'CONFigure:LTE:SIGN:CONNection:CDRX:ODTimer {}'.format(time) self.send_and_recv(cmd) @property def drx_inactivity_timer(self): """ Gets the number of PDCCH subframes to wait before entering DRX mode Args: None Returns: DRX mode inactivity timer """ cmd = 'CONFigure:LTE:SIGN:CONNection:CDRX:ITIMer?' return self.send_and_recv(cmd) @drx_inactivity_timer.setter def drx_inactivity_timer(self, time): """ Sets the number of PDCCH subframes to wait before entering DRX mode Args: timer: Length of the interval to wait Returns: None """ cmd = 'CONFigure:LTE:SIGN:CONNection:CDRX:ITIMer {}'.format(time) self.send_and_recv(cmd) @property def drx_retransmission_timer(self): """ Gets the number of consecutive PDCCH subframes to wait for retransmission Args: None Returns: Number of PDCCH subframes to wait for retransmission """ cmd = 'CONFigure:LTE:SIGN:CONNection:CDRX:RTIMer?' self.send_and_recv(cmd) @drx_retransmission_timer.setter def drx_retransmission_timer(self, time): """ Sets the number of consecutive PDCCH subframes to wait for retransmission Args: time: Number of PDCCH subframes to wait for retransmission Returns: None """ cmd = 'CONFigure:LTE:SIGN:CONNection:CDRX:RTIMer {}'.format(time) self.send_and_recv(cmd) @property def drx_long_cycle(self): """ Gets the amount of subframes representing a DRX long cycle Args: None Returns: The amount of subframes representing one long DRX cycle. One cycle consists of DRX sleep + DRX on duration """ cmd = 'CONFigure:LTE:SIGN:CONNection:CDRX:LDCYcle?' return self.send_and_recv(cmd) @drx_long_cycle.setter def drx_long_cycle(self, long_cycle): """ Sets the amount of subframes representing a DRX long cycle Args: long_cycle: The amount of subframes representing one long DRX cycle. One cycle consists of DRX sleep + DRX on duration Returns: None """ cmd = 'CONFigure:LTE:SIGN:CONNection:CDRX:LDCYcle {}'.format( long_cycle) self.send_and_recv(cmd) @property def drx_long_cycle_offset(self): """ Gets the offset used to determine long cycle starting subframe Args: None Returns: Long cycle offset """ cmd = 'CONFigure:LTE:SIGN:CONNection:CDRX:SOFFset?' return self.send_and_recv(cmd) @drx_long_cycle_offset.setter def drx_long_cycle_offset(self, offset): """ Sets the offset used to determine long cycle starting subframe Args: offset: Number in range 0...(long cycle - 1) """ cmd = 'CONFigure:LTE:SIGN:CONNection:CDRX:SOFFset {}'.format(offset) self.send_and_recv(cmd) @property def apn(self): """Gets the callbox network Access Point Name.""" cmd = 'CONFigure:LTE:SIGNaling:CONNection:APN?' return self.send_and_recv(cmd) @apn.setter def apn(self, apn): """Sets the callbox network Access Point Name. Args: apn: the APN name """ cmd = 'CONFigure:LTE:SIGNaling:CONNection:APN {}'.format(apn) self.send_and_recv(cmd) @property def ip_type(self): """Gets the callbox network IP type.""" cmd = 'CONFigure:LTE:SIGNaling:CONNection:IPVersion?' return self.send_and_recv(cmd) @ip_type.setter def ip_type(self, ip_type): """ Configures the callbox network IP type. Args: ip_type: the network type to use. """ if not isinstance(ip_type, IPAddressType): raise ValueError('state should be the instance of IPAddressType.') cmd = 'CONFigure:LTE:SIGNaling:CONNection:IPVersion {}'.format( ip_type.value) return self.send_and_recv(cmd) @property def mtu(self): """Gets the callbox network Maximum Transmission Unit.""" cmd = 'CONFigure:DATA:CONTrol:MTU?' return self.send_and_recv(cmd) @mtu.setter def mtu(self, mtu): """Sets the callbox network Maximum Transmission Unit. Args: mtu: the MTU size. """ cmd = 'CONFigure:DATA:CONTrol:MTU {}'.format(mtu) self.send_and_recv(cmd) def get_base_station(self, bts_num=BtsNumber.BTS1): """Gets the base station object based on bts num. By default bts_num set to PCC Args: bts_num: base station identifier Returns: base station object. """ return BaseStation(self, bts_num) def init_lte_measurement(self): """Gets the class object for lte measurement which can be used to initiate measurements. Returns: lte measurement object. """ return LteMeasurement(self) def set_sms(self, message): """Sets the SMS message to be sent to the DUT. Args: message: the SMS message to send. """ cmd = 'CONFigure:LTE:SIGN:SMS:OUTGoing:INTernal "{}"'.format(message) self.send_and_recv(cmd) def send_sms(self): """Sends the currently set SMS message.""" self.send_and_recv('CALL:LTE:SIGN:PSWitched:ACTion SMS;*OPC?') timeout = time.time() + STATE_CHANGE_TIMEOUT while time.time() < timeout: state = self.send_and_recv( 'SENSe:LTE:SIGN:SMS:OUTGoing:INFO:LMSent?') if state == "SUCC": return time.sleep(1) raise CmwError('Failed to send SMS message') class BaseStation(object): """Class to interact with different base stations""" def __init__(self, cmw, bts_num): if not isinstance(bts_num, BtsNumber): raise ValueError('bts_num should be an instance of BtsNumber.') self._bts = bts_num.value self._cmw = cmw @property def scc_state(self): """Gets the scc state of cell.""" cmd = 'FETCh:LTE:SIGN:{}:STATe?'.format(self._bts) return self._cmw.send_and_recv(cmd) @property def duplex_mode(self): """Gets current duplex of cell.""" cmd = 'CONFigure:LTE:SIGN:{}:DMODe?'.format(self._bts) return self._cmw.send_and_recv(cmd) @duplex_mode.setter def duplex_mode(self, mode): """Sets the Duplex mode of cell. Args: mode: String indicating FDD or TDD. """ if not isinstance(mode, DuplexMode): raise ValueError('mode should be an instance of DuplexMode.') cmd = 'CONFigure:LTE:SIGN:{}:DMODe {}'.format(self._bts, mode.value) self._cmw.send_and_recv(cmd) @property def band(self): """Gets the current band of cell.""" cmd = 'CONFigure:LTE:SIGN:{}:BAND?'.format(self._bts) return self._cmw.send_and_recv(cmd) @band.setter def band(self, band): """Sets the Band of cell. Args: band: band of cell. """ cmd = 'CONFigure:LTE:SIGN:{}:BAND {}'.format(self._bts, band) self._cmw.send_and_recv(cmd) @property def dl_channel(self): """Gets the downlink channel of cell.""" cmd = 'CONFigure:LTE:SIGN:RFSettings:{}:CHANnel:DL?'.format(self._bts) return self._cmw.send_and_recv(cmd) @dl_channel.setter def dl_channel(self, channel): """Sets the downlink channel number of cell. Args: channel: downlink channel number of cell. """ cmd = 'CONFigure:LTE:SIGN:RFSettings:{}:CHANnel:DL {}'.format( self._bts, channel) self._cmw.send_and_recv(cmd) @property def ul_channel(self): """Gets the uplink channel of cell.""" cmd = 'CONFigure:LTE:SIGN:RFSettings:{}:CHANnel:UL?'.format(self._bts) return self._cmw.send_and_recv(cmd) @ul_channel.setter def ul_channel(self, channel): """Sets the up link channel number of cell. Args: channel: up link channel number of cell. """ cmd = 'CONFigure:LTE:SIGN:RFSettings:{}:CHANnel:UL {}'.format( self._bts, channel) self._cmw.send_and_recv(cmd) @property def bandwidth(self): """Get the channel bandwidth of the cell.""" cmd = 'CONFigure:LTE:SIGN:CELL:BANDwidth:{}:DL?'.format(self._bts) return self._cmw.send_and_recv(cmd) @bandwidth.setter def bandwidth(self, bandwidth): """Sets the channel bandwidth of the cell. Args: bandwidth: channel bandwidth of cell. """ if not isinstance(bandwidth, LteBandwidth): raise ValueError('bandwidth should be an instance of ' 'LteBandwidth.') cmd = 'CONFigure:LTE:SIGN:CELL:BANDwidth:{}:DL {}'.format( self._bts, bandwidth.value) self._cmw.send_and_recv(cmd) @property def ul_frequency(self): """Get the uplink frequency of the cell.""" cmd = 'CONFigure:LTE:SIGN:RFSettings:{}:CHANnel:UL? MHZ'.format( self._bts) return self._cmw.send_and_recv(cmd) @ul_frequency.setter def ul_frequency(self, freq): """Get the uplink frequency of the cell. Args: freq: uplink frequency of the cell. """ cmd = 'CONFigure:LTE:SIGN:RFSettings:{}:CHANnel:UL {} MHZ'.format( self._bts, freq) self._cmw.send_and_recv(cmd) @property def dl_frequency(self): """Get the downlink frequency of the cell""" cmd = 'CONFigure:LTE:SIGN:RFSettings:{}:CHANnel:DL? MHZ'.format( self._bts) return self._cmw.send_and_recv(cmd) @dl_frequency.setter def dl_frequency(self, freq): """Get the downlink frequency of the cell. Args: freq: downlink frequency of the cell. """ cmd = 'CONFigure:LTE:SIGN:RFSettings:{}:CHANnel:DL {} MHZ'.format( self._bts, freq) self._cmw.send_and_recv(cmd) @property def transmode(self): """Gets the TM of cell.""" cmd = 'CONFigure:LTE:SIGN:CONNection:{}:TRANsmission?'.format( self._bts) return self._cmw.send_and_recv(cmd) @transmode.setter def transmode(self, tm_mode): """Sets the TM of cell. Args: tm_mode: TM of cell. """ if not isinstance(tm_mode, TransmissionModes): raise ValueError('tm_mode should be an instance of ' 'Transmission modes.') cmd = 'CONFigure:LTE:SIGN:CONNection:{}:TRANsmission {}'.format( self._bts, tm_mode.value) self._cmw.send_and_recv(cmd) @property def downlink_power_level(self): """Gets RSPRE level.""" cmd = 'CONFigure:LTE:SIGN:DL:{}:RSEPre:LEVel?'.format(self._bts) return self._cmw.send_and_recv(cmd) @downlink_power_level.setter def downlink_power_level(self, pwlevel): """Modifies RSPRE level. Args: pwlevel: power level in dBm. """ cmd = 'CONFigure:LTE:SIGN:DL:{}:RSEPre:LEVel {}'.format( self._bts, pwlevel) self._cmw.send_and_recv(cmd) @property def uplink_power_control(self): """Gets open loop nominal power directly.""" cmd = 'CONFigure:LTE:SIGN:UL:{}:PUSCh:OLNPower?'.format(self._bts) return self._cmw.send_and_recv(cmd) @uplink_power_control.setter def uplink_power_control(self, ul_power): """Sets open loop nominal power directly. Args: ul_power: uplink power level. """ cmd = 'CONFigure:LTE:SIGN:UL:{}:PUSCh:OLNPower {}'.format( self._bts, ul_power) self._cmw.send_and_recv(cmd) @property def uldl_configuration(self): """Gets uldl configuration of the cell.""" cmd = 'CONFigure:LTE:SIGN:CELL:{}:ULDL?'.format(self._bts) return self._cmw.send_and_recv(cmd) @uldl_configuration.setter def uldl_configuration(self, uldl): """Sets the ul-dl configuration. Args: uldl: Configuration value ranging from 0 to 6. """ if uldl not in range(0, 7): raise ValueError('uldl configuration value should be between' ' 0 and 6 inclusive.') cmd = 'CONFigure:LTE:SIGN:CELL:{}:ULDL {}'.format(self._bts, uldl) self._cmw.send_and_recv(cmd) @property def tdd_special_subframe(self): """Gets special subframe of the cell.""" cmd = 'CONFigure:LTE:SIGN:CELL:{}:SSUBframe?'.format(self._bts) return self._cmw.send_and_recv(cmd) @tdd_special_subframe.setter def tdd_special_subframe(self, sframe): """Sets the tdd special subframe of the cell. Args: sframe: Integer value ranging from 1 to 9. """ if sframe not in range(0, 10): raise ValueError('tdd special subframe should be between 0 and 9' ' inclusive.') cmd = 'CONFigure:LTE:SIGN:CELL:{}:SSUBframe {}'.format( self._bts, sframe) self._cmw.send_and_recv(cmd) @property def scheduling_mode(self): """Gets the current scheduling mode.""" cmd = 'CONFigure:LTE:SIGN:CONNection:{}:STYPe?'.format(self._bts) return self._cmw.send_and_recv(cmd) @scheduling_mode.setter def scheduling_mode(self, mode): """Sets the scheduling type for the cell. Args: mode: Selects the channel mode to be scheduled. """ if not isinstance(mode, SchedulingMode): raise ValueError('mode should be the instance of scheduling mode.') cmd = 'CONFigure:LTE:SIGN:CONNection:{}:STYPe {}'.format( self._bts, mode.value) self._cmw.send_and_recv(cmd) @property def rb_configuration_dl(self): """Gets rmc's rb configuration for down link. This function returns Number of Resource blocks, Resource block position and Modulation type. """ cmd = 'CONFigure:LTE:SIGN:CONNection:{}:{}:DL?'.format( self._bts, self.scheduling_mode) return self._cmw.send_and_recv(cmd) @rb_configuration_dl.setter def rb_configuration_dl(self, rb_config): """Sets the rb configuration for down link for scheduling type. Args: rb_config: Tuple containing Number of resource blocks, resource block position and modulation type. Raises: ValueError: If tuple unpacking fails. """ if self.scheduling_mode == 'RMC': rb, rb_pos, modulation = rb_config cmd = ('CONFigure:LTE:SIGN:CONNection:{}:RMC:DL {},{},' '{}'.format(self._bts, rb, rb_pos, modulation)) self._cmw.send_and_recv(cmd) elif self.scheduling_mode == 'UDCH': rb, start_rb, modulation, tbs = rb_config self.validate_rb(rb) if not isinstance(modulation, ModulationType): raise ValueError('Modulation should be of type ' 'ModulationType.') cmd = ('CONFigure:LTE:SIGN:CONNection:{}:UDCHannels:DL {},{},' '{},{}'.format(self._bts, rb, start_rb, modulation.value, tbs)) self._cmw.send_and_recv(cmd) @property def rb_configuration_ul(self): """Gets rb configuration for up link. This function returns Number of Resource blocks, Resource block position and Modulation type. """ cmd = 'CONFigure:LTE:SIGN:CONNection:{}:{}:UL?'.format( self._bts, self.scheduling_mode) return self._cmw.send_and_recv(cmd) @rb_configuration_ul.setter def rb_configuration_ul(self, rb_config): """Sets the rb configuration for down link for scheduling mode. Args: rb_config: Tuple containing Number of resource blocks, resource block position and modulation type. Raises: ValueError: If tuple unpacking fails. """ if self.scheduling_mode == 'RMC': rb, rb_pos, modulation = rb_config cmd = ('CONFigure:LTE:SIGN:CONNection:{}:RMC:UL {},{},' '{}'.format(self._bts, rb, rb_pos, modulation)) self._cmw.send_and_recv(cmd) elif self.scheduling_mode == 'UDCH': rb, start_rb, modulation, tbs = rb_config self.validate_rb(rb) if not isinstance(modulation, ModulationType): raise ValueError('Modulation should be of type ' 'ModulationType.') cmd = ('CONFigure:LTE:SIGN:CONNection:{}:UDCHannels:UL {},{},' '{},{}'.format(self._bts, rb, start_rb, modulation.value, tbs)) self._cmw.send_and_recv(cmd) def validate_rb(self, rb): """Validates if rb is within the limits for bandwidth set. Args: rb: No. of resource blocks. Raises: ValueError if rb out of range. """ bandwidth = self.bandwidth if bandwidth == LteBandwidth.BANDWIDTH_1MHz.value: if not 0 <= rb <= 6: raise ValueError('RB should be between 0 to 6 inclusive' ' for 1.4Mhz.') elif bandwidth == LteBandwidth.BANDWIDTH_3MHz.value: if not 0 <= rb <= 10: raise ValueError('RB should be between 0 to 10 inclusive' ' for 3 Mhz.') elif bandwidth == LteBandwidth.BANDWIDTH_5MHz.value: if not 0 <= rb <= 25: raise ValueError('RB should be between 0 to 25 inclusive' ' for 5 Mhz.') elif bandwidth == LteBandwidth.BANDWIDTH_10MHz.value: if not 0 <= rb <= 50: raise ValueError('RB should be between 0 to 50 inclusive' ' for 10 Mhz.') elif bandwidth == LteBandwidth.BANDWIDTH_15MHz.value: if not 0 <= rb <= 75: raise ValueError('RB should be between 0 to 75 inclusive' ' for 15 Mhz.') elif bandwidth == LteBandwidth.BANDWIDTH_20MHz.value: if not 0 <= rb <= 100: raise ValueError('RB should be between 0 to 100 inclusive' ' for 20 Mhz.') @property def rb_position_dl(self): """Gets the position of the allocated down link resource blocks within the channel band-width. """ cmd = 'CONFigure:LTE:SIGN:CONNection:{}:RMC:RBPosition:DL?'.format( self._bts) return self._cmw.send_and_recv(cmd) @rb_position_dl.setter def rb_position_dl(self, rbpos): """Selects the position of the allocated down link resource blocks within the channel band-width Args: rbpos: position of resource blocks. """ if not isinstance(rbpos, RbPosition): raise ValueError('rbpos should be the instance of RbPosition.') cmd = 'CONFigure:LTE:SIGN:CONNection:{}:RMC:RBPosition:DL {}'.format( self._bts, rbpos.value) self._cmw.send_and_recv(cmd) @property def rb_position_ul(self): """Gets the position of the allocated up link resource blocks within the channel band-width. """ cmd = 'CONFigure:LTE:SIGN:CONNection:{}:RMC:RBPosition:UL?'.format( self._bts) return self._cmw.send_and_recv(cmd) @rb_position_ul.setter def rb_position_ul(self, rbpos): """Selects the position of the allocated up link resource blocks within the channel band-width. Args: rbpos: position of resource blocks. """ if not isinstance(rbpos, RbPosition): raise ValueError('rbpos should be the instance of RbPosition.') cmd = 'CONFigure:LTE:SIGN:CONNection:{}:RMC:RBPosition:UL {}'.format( self._bts, rbpos.value) self._cmw.send_and_recv(cmd) @property def dci_format(self): """Gets the downlink control information (DCI) format.""" cmd = 'CONFigure:LTE:SIGN:CONNection:{}:DCIFormat?'.format(self._bts) return self._cmw.send_and_recv(cmd) @dci_format.setter def dci_format(self, dci_format): """Selects the downlink control information (DCI) format. Args: dci_format: supported dci. """ if not isinstance(dci_format, DciFormat): raise ValueError('dci_format should be the instance of DciFormat.') cmd = 'CONFigure:LTE:SIGN:CONNection:{}:DCIFormat {}'.format( self._bts, dci_format.value) self._cmw.send_and_recv(cmd) @property def dl_antenna(self): """Gets dl antenna count of cell.""" cmd = 'CONFigure:LTE:SIGN:CONNection:{}:NENBantennas?'.format( self._bts) return self._cmw.send_and_recv(cmd) @dl_antenna.setter def dl_antenna(self, num_antenna): """Sets the dl antenna count of cell. Args: num_antenna: Count of number of dl antennas to use. """ if not isinstance(num_antenna, MimoModes): raise ValueError('num_antenna should be an instance of MimoModes.') cmd = 'CONFigure:LTE:SIGN:CONNection:{}:NENBantennas {}'.format( self._bts, num_antenna.value) self._cmw.send_and_recv(cmd) @property def reduced_pdcch(self): """Gets the reduction of PDCCH resources state.""" cmd = 'CONFigure:LTE:SIGN:CONNection:{}:PDCCh:RPDCch?'.format( self._bts) return self._cmw.send_and_recv(cmd) @reduced_pdcch.setter def reduced_pdcch(self, state): """Sets the reduction of PDCCH resources state. Args: state: ON/OFF. """ cmd = 'CONFigure:LTE:SIGN:CONNection:{}:PDCCh:RPDCch {}'.format( self._bts, state.value) self._cmw.send_and_recv(cmd) @property def tpc_power_control(self): """Gets the type of uplink power control used.""" cmd = 'CONFigure:LTE:SIGN:UL:{}:PUSCh:TPC:SET?'.format(self._bts) return self._cmw.send_and_recv(cmd) @tpc_power_control.setter def tpc_power_control(self, set_type): """Set and execute the Up Link Power Control via TPC. Args: set_type: Type of tpc power control. """ if not isinstance(set_type, TpcPowerControl): raise ValueError('set_type should be the instance of ' 'TpCPowerControl.') cmd = 'CONFigure:LTE:SIGN:UL:{}:PUSCh:TPC:SET {}'.format( self._bts, set_type.value) self._cmw.send_and_recv(cmd) cmd = 'CONFigure:LTE:SIGN:UL:{}:PUSCh:TPC:PEXecute'.format(self._bts) self._cmw.send_and_recv(cmd) @property def tpc_closed_loop_target_power(self): """Gets the target powers for power control with the TPC setup.""" cmd = 'CONFigure:LTE:SIGN:UL:{}:PUSCh:TPC:CLTPower?'.format(self._bts) return self._cmw.send_and_recv(cmd) @tpc_closed_loop_target_power.setter def tpc_closed_loop_target_power(self, cltpower): """Sets the target powers for power control with the TPC setup. Args: tpower: Target power. """ cmd = 'CONFigure:LTE:SIGN:UL:{}:PUSCh:TPC:CLTPower {}'.format( self._bts, cltpower) self._cmw.send_and_recv(cmd) class LteMeasurementState(Enum): """Possible measurement states""" OFF = 'OFF' READY = 'RDY' RUN = 'RUN' def _try_parse(s, fun): try: return fun(s) except ValueError: return None class LteMeasurement(object): """ Class for measuring LTE performance """ INDEX_TX = 17 INDEX_TX_MIN = 17 INDEX_TX_MAX = 18 def __init__(self, cmw): self._cmw = cmw @property def sample_count(self): """Gets the number of samples to use when calculating results.""" cmd = 'CONFigure:LTE:MEAS:MEValuation:SCOunt:MODulation?' return self._cmw.send_and_recv(cmd) @sample_count.setter def sample_count(self, sample_count): cmd = 'CONFigure:LTE:MEAS:MEValuation:SCOunt:MODulation {}'.format( sample_count) self._cmw.send_and_recv(cmd) @property def average(self): """Gets the average values of the most recent measurement. Invalid measurements are set to None. """ values = self._cmw.send_and_recv( 'FETch:LTE:MEAS:MEValuation:MODulation:AVERage?').split(',') for i in range(0, 2): values[i] = _try_parse(values[i], int) for i in range(2, len(values)): values[i] = _try_parse(values[i], float) return values @property def extrema(self): """Gets the extrema values of the most recent measurement. Invalid measurements are set to None. """ values = self._cmw.send_and_recv( 'FETch:LTE:MEAS:MEValuation:MODulation:EXTReme?').split(',') for i in range(0, 2): values[i] = _try_parse(values[i], int) for i in range(2, len(values)): values[i] = _try_parse(values[i], float) return values @property def stdev(self): """Gets the standard deviation of the most recent measurement. Invalid measurements are set to None. """ values = self._cmw.send_and_recv( 'FETch:LTE:MEAS:MEValuation:MODulation:SDEV?').split(',') for i in range(0, 2): values[i] = _try_parse(values[i], int) for i in range(2, len(values)): values[i] = _try_parse(values[i], float) return values @property def tx_average(self): """Gets the measured average Tx power (in dBm).""" value = self.average[self.INDEX_TX] if value is None: raise ValueError("LteMeasurement has no value for tx_average") return value @property def tx_min(self): """Gets the measured minimum Tx power (in dBm).""" value = self.extrema[self.INDEX_TX_MIN] if value is None: raise ValueError("LteMeasurement has no value for tx_min") return value @property def tx_max(self): """Gets the measured maximum Tx power (in dBm).""" value = self.extrema[self.INDEX_TX_MAX] if value is None: raise ValueError("LteMeasurement has no value for tx_max") return value @property def tx_stdev(self): """Gets the measured Tx power standard deviation (in dBm).""" value = self.stdev[self.INDEX_TX] if value is None: raise ValueError( "LteMeasurement has no value for standard_deviation") return value @property def measurement_repetition(self): """Returns the measurement repetition mode that has been set.""" return self._cmw.send_and_recv( 'CONFigure:LTE:MEAS:MEValuation:REPetition?') @measurement_repetition.setter def measurement_repetition(self, mode): """Sets the mode for measuring power levels. Args: mode: Single shot/continuous. """ if not isinstance(mode, RepetitionMode): raise ValueError('mode should be the instance of Repetition Mode') cmd = 'CONFigure:LTE:MEAS:MEValuation:REPetition {}'.format(mode.value) self._cmw.send_and_recv(cmd) @property def query_measurement_state(self): """Returns the states and sub states of measurement.""" return self._cmw.send_and_recv('FETCh:LTE:MEAS:MEValuation:STATe:ALL?') @property def measure_tx_power(self): """Return the current Tx power measurement.""" return self._cmw.send_and_recv( 'FETCh:LTE:MEAS:MEValuation:PMONitor:AVERage?') @property def state(self): """Gets the state of the measurement.""" cmd = 'FETCh:LTE:MEAS:MEValuation:STATe?' return LteMeasurementState(self._cmw.send_and_recv(cmd)) def initialize_measurement(self): """Initialize measurement modules.""" self._cmw.send_and_recv( 'CONF:LTE:MEAS:MEV:RES:ALL ON,ON,ON,ON,ON,ON,ON,ON,ON,ON,ON,ON') self._cmw.send_and_recv('ROUTe:LTE:MEAS:SCENario:CSPath "LTE Sig1"') self._cmw.send_and_recv('INIT:LTE:MEAS:MEValuation;*OPC?') self._wait_for_state({LteMeasurementState.RUN}) def run_measurement(self): """Runs a single Tx multievaluation measurement to completion.""" self.stop_measurement() self.measurement_repetition = RepetitionMode.SINGLESHOT self.initialize_measurement() self._wait_for_state({LteMeasurementState.READY}, timeout=120) def stop_measurement(self): """Stops the on-going measurement. This function call does not free up resources allocated for measurement. Instead it moves from RUN to RDY state. """ self._cmw.send_and_recv('STOP:LTE:MEAS:MEValuation') self._wait_for_state( {LteMeasurementState.OFF, LteMeasurementState.READY}) def abort_measurement(self): """Aborts the measurement abruptly. This function call will free up the resources allocated for measurement and all the results will be wiped off. """ self._cmw.send_and_recv('ABORt:LTE:MEAS:MEValuation') self._wait_for_state({LteMeasurementState.OFF}) def _wait_for_state(self, states, timeout=10): """Polls the measurement state until it reaches an allowable state Args: states: the allowed states timeout: the maximum amount time to wait (in seconds) """ while timeout > 0: if self.state in states: return time.sleep(1) timeout -= 1 raise CmwError( 'Failed to wait for LTE measurement state: {}.'.format(states)) class CmwError(Exception): """Class to raise exceptions related to cmw."""