# Lint as: python2, python3 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. from autotest_lib.client.cros.cellular import cellular_logging from autotest_lib.client.cros.cellular import cellular_system_error log = cellular_logging.SetupCellularLogging('scpi_driver') class _ErrorCheckerContext(object): """Reference-count our error-checking state and only check for errors when we take the first ref or drop the last ref. This way, we can minimize the number of checks; each one takes a bit of time. You will likely want to set always_check to True when debugging new SCPI interactions. On first entry, we check for errors, but do not stop if we find them; these are errors that were accumulated on the device before this test ran. """ def __init__(self, scpi): self.always_check = True # True for serious debugging self.scpi = scpi self.depth = 0 self.raise_on_error = True def __enter__(self): log.debug('ErrorCheckerContext Depth: %s' % self.depth) if self.depth == 0 or self.always_check: errors = self.scpi._WaitAndFetchErrors( raise_on_error=False) # Never raise when clearing old errors self.depth += 1 return self def __exit__(self, type, value, traceback): self.depth -= 1 if self.depth <= 0 or self.always_check: self.scpi._WaitAndFetchErrors() return class Scpi(object): """Wrapper for SCPI. SCPI = "standard commands for programmable instruments", a relative of GPIB. The SCPI driver must export: Query, Send, Reset and Close """ def __init__(self, driver, opc_on_stanza=False): self.driver = driver self.opc_on_stanza = opc_on_stanza self.checker_context = _ErrorCheckerContext(self) def Query(self, command): """Send the SCPI command and return the response.""" response = self.driver.Query(command) return response def Send(self, command): """Send the SCPI command.""" self.driver.Send(command) def Reset(self): """Tell the device to reset with *RST.""" # Some devices (like the prologix) require special handling for # reset. self.driver.Reset() def Close(self): """Close the device.""" self.driver.Close() def RetrieveErrors(self): """Retrieves all SYSTem:ERRor messages from the device.""" errors = [] while True: error = self.Query('SYSTem:ERRor?') if '+0,"No error"' in error: # We've reached the end of the error stack break if '-420' in error and 'Query UNTERMINATED' in error: # This is benign; the GPIB bridge asked for a response when # the device didn't have one to give. # TODO(rochberg): This is a layering violation; we should # really only accept -420 if the underlying driver is in a # mode that is known to cause this continue if '+292' in error and 'Data arrived on unknown SAPI' in error: # This may be benign; It is known to occur when we do a switch # from GPRS to WCDMA continue errors.append(error) self.Send('*CLS') # Clear status errors.reverse() return errors def _WaitAndFetchErrors(self, raise_on_error=True): """Waits for command completion, returns errors.""" self.Query('*OPC?') # Wait for operation complete errors = self.RetrieveErrors() if errors and raise_on_error: raise cellular_system_error.BadScpiCommand('\n'.join(errors)) return errors def SimpleVerify(self, command, arg): """Sends "command arg", then "command?", expecting arg back. Arguments: command: SCPI command arg: Argument. We currently check for exact equality: you should send strings quoted with " because that's what the 8960 returns. We also fail if you send 1 and receive +1 back. Raises: Error: Verification failed """ self.always_check = False with self.checker_context: self.Send('%s %s' % (command, arg)) result = self.Query('%s?' % (command,)) if result != arg: raise cellular_system_error.BadScpiCommand( 'Error on %s: sent %s, got %s' % (command, arg, result)) def SendStanza(self, commands): """ Sends a list of commands and verifies that they complete correctly. """ with self.checker_context: for c in commands: if self.opc_on_stanza: self.Send(c) self.Query('*OPC?') else: self.Send(c)