1#!/usr/bin/python2 2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import cellular_logging 7import cellular_system_error 8 9log = cellular_logging.SetupCellularLogging('scpi_driver') 10 11 12class _ErrorCheckerContext(object): 13 """Reference-count our error-checking state and only check for 14 errors when we take the first ref or drop the last ref. 15 16 This way, we can minimize the number of checks; each one takes a 17 bit of time. You will likely want to set always_check to True when 18 debugging new SCPI interactions. 19 20 On first entry, we check for errors, but do not stop if we find 21 them; these are errors that were accumulated on the device before 22 this test ran. 23 """ 24 25 def __init__(self, scpi): 26 self.always_check = True # True for serious debugging 27 self.scpi = scpi 28 self.depth = 0 29 self.raise_on_error = True 30 31 def __enter__(self): 32 log.debug('ErrorCheckerContext Depth: %s' % self.depth) 33 if self.depth == 0 or self.always_check: 34 errors = self.scpi._WaitAndFetchErrors( 35 raise_on_error=False) # Never raise when clearing old errors 36 self.depth += 1 37 return self 38 39 def __exit__(self, type, value, traceback): 40 self.depth -= 1 41 if self.depth <= 0 or self.always_check: 42 self.scpi._WaitAndFetchErrors() 43 return 44 45 46class Scpi(object): 47 """Wrapper for SCPI. 48 49 SCPI = "standard commands for programmable instruments", 50 a relative of GPIB. 51 52 The SCPI driver must export: Query, Send, Reset and Close 53 """ 54 55 def __init__(self, driver, opc_on_stanza=False): 56 self.driver = driver 57 self.opc_on_stanza = opc_on_stanza 58 self.checker_context = _ErrorCheckerContext(self) 59 60 def Query(self, command): 61 """Send the SCPI command and return the response.""" 62 response = self.driver.Query(command) 63 return response 64 65 def Send(self, command): 66 """Send the SCPI command.""" 67 self.driver.Send(command) 68 69 def Reset(self): 70 """Tell the device to reset with *RST.""" 71 # Some devices (like the prologix) require special handling for 72 # reset. 73 self.driver.Reset() 74 75 def Close(self): 76 """Close the device.""" 77 self.driver.Close() 78 79 def RetrieveErrors(self): 80 """Retrieves all SYSTem:ERRor messages from the device.""" 81 errors = [] 82 while True: 83 error = self.Query('SYSTem:ERRor?') 84 if '+0,"No error"' in error: 85 # We've reached the end of the error stack 86 break 87 88 if '-420' in error and 'Query UNTERMINATED' in error: 89 # This is benign; the GPIB bridge asked for a response when 90 # the device didn't have one to give. 91 92 # TODO(rochberg): This is a layering violation; we should 93 # really only accept -420 if the underlying driver is in a 94 # mode that is known to cause this 95 continue 96 97 if '+292' in error and 'Data arrived on unknown SAPI' in error: 98 # This may be benign; It is known to occur when we do a switch 99 # from GPRS to WCDMA 100 continue 101 102 errors.append(error) 103 104 self.Send('*CLS') # Clear status 105 errors.reverse() 106 return errors 107 108 def _WaitAndFetchErrors(self, raise_on_error=True): 109 """Waits for command completion, returns errors.""" 110 self.Query('*OPC?') # Wait for operation complete 111 errors = self.RetrieveErrors() 112 if errors and raise_on_error: 113 raise cellular_system_error.BadScpiCommand('\n'.join(errors)) 114 return errors 115 116 def SimpleVerify(self, command, arg): 117 """Sends "command arg", then "command?", expecting arg back. 118 119 Arguments: 120 command: SCPI command 121 arg: Argument. We currently check for exact equality: you should 122 send strings quoted with " because that's what the 8960 returns. 123 We also fail if you send 1 and receive +1 back. 124 125 Raises: 126 Error: Verification failed 127 """ 128 self.always_check = False 129 with self.checker_context: 130 self.Send('%s %s' % (command, arg)) 131 result = self.Query('%s?' % (command,)) 132 if result != arg: 133 raise cellular_system_error.BadScpiCommand( 134 'Error on %s: sent %s, got %s' % (command, arg, result)) 135 136 def SendStanza(self, commands): 137 """ 138 Sends a list of commands and verifies that they complete correctly. 139 """ 140 with self.checker_context: 141 for c in commands: 142 if self.opc_on_stanza: 143 self.Send(c) 144 self.Query('*OPC?') 145 else: 146 self.Send(c) 147