1#!/usr/bin/env python3.4 2# 3# Copyright 2016 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import importlib 18import logging 19 20from acts.keys import Config 21from acts.libs.proc import job 22 23ACTS_CONTROLLER_CONFIG_NAME = 'Attenuator' 24ACTS_CONTROLLER_REFERENCE_NAME = 'attenuators' 25_ATTENUATOR_OPEN_RETRIES = 3 26 27 28def create(configs): 29 objs = [] 30 for c in configs: 31 attn_model = c['Model'] 32 # Default to telnet. 33 protocol = c.get('Protocol', 'telnet') 34 module_name = 'acts.controllers.attenuator_lib.%s.%s' % (attn_model, 35 protocol) 36 module = importlib.import_module(module_name) 37 inst_cnt = c['InstrumentCount'] 38 attn_inst = module.AttenuatorInstrument(inst_cnt) 39 attn_inst.model = attn_model 40 41 ip_address = c[Config.key_address.value] 42 port = c[Config.key_port.value] 43 44 for attempt_number in range(1, _ATTENUATOR_OPEN_RETRIES + 1): 45 try: 46 insts = attn_inst.open(ip_address, port) 47 except Exception as e: 48 logging.error('Attempt %s to open connection to attenuator ' 49 'failed: %s' % (attempt_number, e)) 50 if attempt_number == _ATTENUATOR_OPEN_RETRIES: 51 ping_output = job.run( 52 'ping %s -c 1 -w 1' % ip_address, ignore_status=True) 53 if ping_output.exit_status == 1: 54 logging.error( 55 'Unable to ping attenuator at %s' % ip_address) 56 else: 57 logging.error( 58 'Able to ping attenuator at %s' % ip_address) 59 job.run( 60 'echo "q" | telnet %s %s' % (ip_address, port), 61 ignore_status=True) 62 raise 63 for i in range(inst_cnt): 64 attn = Attenuator(attn_inst, idx=i) 65 if 'Paths' in c: 66 try: 67 setattr(attn, 'path', c['Paths'][i]) 68 except IndexError: 69 logging.error('No path specified for attenuator %d.', i) 70 raise 71 objs.append(attn) 72 return objs 73 74 75def destroy(objs): 76 for attn in objs: 77 attn.instrument.close() 78 79 80"""Classes for accessing, managing, and manipulating attenuators. 81 82Users will instantiate a specific child class, but almost all operation should 83be performed on the methods and data members defined here in the base classes 84or the wrapper classes. 85""" 86 87 88class AttenuatorError(Exception): 89 """Base class for all errors generated by Attenuator-related modules.""" 90 91 92class InvalidDataError(AttenuatorError): 93 """"Raised when an unexpected result is seen on the transport layer. 94 95 When this exception is seen, closing an re-opening the link to the 96 attenuator instrument is probably necessary. Something has gone wrong in 97 the transport. 98 """ 99 pass 100 101 102class InvalidOperationError(AttenuatorError): 103 """Raised when the attenuator's state does not allow the given operation. 104 105 Certain methods may only be accessed when the instance upon which they are 106 invoked is in a certain state. This indicates that the object is not in the 107 correct state for a method to be called. 108 """ 109 pass 110 111 112class AttenuatorInstrument(object): 113 """Defines the primitive behavior of all attenuator instruments. 114 115 The AttenuatorInstrument class is designed to provide a simple low-level 116 interface for accessing any step attenuator instrument comprised of one or 117 more attenuators and a controller. All AttenuatorInstruments should override 118 all the methods below and call AttenuatorInstrument.__init__ in their 119 constructors. Outside of setup/teardown, devices should be accessed via 120 this generic "interface". 121 """ 122 model = None 123 INVALID_MAX_ATTEN = 999.9 124 125 def __init__(self, num_atten=0): 126 """This is the Constructor for Attenuator Instrument. 127 128 Args: 129 num_atten: The number of attenuators contained within the 130 instrument. In some instances setting this number to zero will 131 allow the driver to auto-determine the number of attenuators; 132 however, this behavior is not guaranteed. 133 134 Raises: 135 NotImplementedError if initialization is called from this class. 136 """ 137 138 if type(self) is AttenuatorInstrument: 139 raise NotImplementedError( 140 'Base class should not be instantiated directly!') 141 142 self.num_atten = num_atten 143 self.max_atten = AttenuatorInstrument.INVALID_MAX_ATTEN 144 self.properties = None 145 146 def set_atten(self, idx, value, strict=True): 147 """Sets the attenuation given its index in the instrument. 148 149 Args: 150 idx: A zero based index used to identify a particular attenuator in 151 an instrument. 152 value: a floating point value for nominal attenuation to be set. 153 strict: if True, function raises an error when given out of 154 bounds attenuation values, if false, the function sets out of 155 bounds values to 0 or max_atten. 156 """ 157 raise NotImplementedError('Base class should not be called directly!') 158 159 def get_atten(self, idx): 160 """Returns the current attenuation of the attenuator at index idx. 161 162 Args: 163 idx: A zero based index used to identify a particular attenuator in 164 an instrument. 165 166 Returns: 167 The current attenuation value as a floating point value 168 """ 169 raise NotImplementedError('Base class should not be called directly!') 170 171 172class Attenuator(object): 173 """An object representing a single attenuator in a remote instrument. 174 175 A user wishing to abstract the mapping of attenuators to physical 176 instruments should use this class, which provides an object that abstracts 177 the physical implementation and allows the user to think only of attenuators 178 regardless of their location. 179 """ 180 181 def __init__(self, instrument, idx=0, offset=0): 182 """This is the constructor for Attenuator 183 184 Args: 185 instrument: Reference to an AttenuatorInstrument on which the 186 Attenuator resides 187 idx: This zero-based index is the identifier for a particular 188 attenuator in an instrument. 189 offset: A power offset value for the attenuator to be used when 190 performing future operations. This could be used for either 191 calibration or to allow group operations with offsets between 192 various attenuators. 193 194 Raises: 195 TypeError if an invalid AttenuatorInstrument is passed in. 196 IndexError if the index is out of range. 197 """ 198 if not isinstance(instrument, AttenuatorInstrument): 199 raise TypeError('Must provide an Attenuator Instrument Ref') 200 self.model = instrument.model 201 self.instrument = instrument 202 self.idx = idx 203 self.offset = offset 204 205 if self.idx >= instrument.num_atten: 206 raise IndexError( 207 'Attenuator index out of range for attenuator instrument') 208 209 def set_atten(self, value, strict=True): 210 """Sets the attenuation. 211 212 Args: 213 value: A floating point value for nominal attenuation to be set. 214 strict: if True, function raises an error when given out of 215 bounds attenuation values, if false, the function sets out of 216 bounds values to 0 or max_atten. 217 218 Raises: 219 ValueError if value + offset is greater than the maximum value. 220 """ 221 if value + self.offset > self.instrument.max_atten and strict: 222 raise ValueError( 223 'Attenuator Value+Offset greater than Max Attenuation!') 224 225 self.instrument.set_atten(self.idx, value + self.offset, strict) 226 227 def get_atten(self): 228 """Returns the attenuation as a float, normalized by the offset.""" 229 return self.instrument.get_atten(self.idx) - self.offset 230 231 def get_max_atten(self): 232 """Returns the max attenuation as a float, normalized by the offset.""" 233 if self.instrument.max_atten == AttenuatorInstrument.INVALID_MAX_ATTEN: 234 raise ValueError('Invalid Max Attenuator Value') 235 236 return self.instrument.max_atten - self.offset 237 238 239class AttenuatorGroup(object): 240 """An abstraction for groups of attenuators that will share behavior. 241 242 Attenuator groups are intended to further facilitate abstraction of testing 243 functions from the physical objects underlying them. By adding attenuators 244 to a group, it is possible to operate on functional groups that can be 245 thought of in a common manner in the test. This class is intended to provide 246 convenience to the user and avoid re-implementation of helper functions and 247 small loops scattered throughout user code. 248 """ 249 250 def __init__(self, name=''): 251 """This constructor for AttenuatorGroup 252 253 Args: 254 name: An optional parameter intended to further facilitate the 255 passing of easily tracked groups of attenuators throughout code. 256 It is left to the user to use the name in a way that meets their 257 needs. 258 """ 259 self.name = name 260 self.attens = [] 261 self._value = 0 262 263 def add_from_instrument(self, instrument, indices): 264 """Adds an AttenuatorInstrument to the group. 265 266 This function will create Attenuator objects for all of the indices 267 passed in and add them to the group. 268 269 Args: 270 instrument: the AttenuatorInstrument to pull attenuators from. 271 indices: The index or indices to add to the group. Either a 272 range, a list, or a single integer. 273 274 Raises 275 ------ 276 TypeError 277 Requires a valid AttenuatorInstrument to be passed in. 278 """ 279 if not instrument or not isinstance(instrument, AttenuatorInstrument): 280 raise TypeError('Must provide an Attenuator Instrument Ref') 281 282 if type(indices) is range or type(indices) is list: 283 for i in indices: 284 self.attens.append(Attenuator(instrument, i)) 285 elif type(indices) is int: 286 self.attens.append(Attenuator(instrument, indices)) 287 288 def add(self, attenuator): 289 """Adds an already constructed Attenuator object to this group. 290 291 Args: 292 attenuator: An Attenuator object. 293 294 Raises: 295 TypeError if the attenuator parameter is not an Attenuator. 296 """ 297 if not isinstance(attenuator, Attenuator): 298 raise TypeError('Must provide an Attenuator') 299 300 self.attens.append(attenuator) 301 302 def synchronize(self): 303 """Sets all grouped attenuators to the group's attenuation value.""" 304 self.set_atten(self._value) 305 306 def is_synchronized(self): 307 """Returns true if all attenuators have the synchronized value.""" 308 for att in self.attens: 309 if att.get_atten() != self._value: 310 return False 311 return True 312 313 def set_atten(self, value): 314 """Sets the attenuation value of all attenuators in the group. 315 316 Args: 317 value: A floating point value for nominal attenuation to be set. 318 """ 319 value = float(value) 320 for att in self.attens: 321 att.set_atten(value) 322 self._value = value 323 324 def get_atten(self): 325 """Returns the current attenuation setting of AttenuatorGroup.""" 326 return float(self._value) 327