• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from autotest_lib.client.cros.cellular import cellular_logging
7from autotest_lib.client.cros.cellular import cellular_system_error
8
9log = cellular_logging.SetupCellularLogging('scpi_driver')
10
11
12class _ErrorCheckerContext(object):
13    """Reference-count our error-checking state and only check for
14    errors when we take the first ref or drop the last ref.
15
16    This way, we can minimize the number of checks; each one takes a
17    bit of time.  You will likely want to set always_check to True when
18    debugging new SCPI interactions.
19
20    On first entry, we check for errors, but do not stop if we find
21    them; these are errors that were accumulated on the device before
22    this test ran.
23    """
24
25    def __init__(self, scpi):
26        self.always_check = True  # True for serious debugging
27        self.scpi = scpi
28        self.depth = 0
29        self.raise_on_error = True
30
31    def __enter__(self):
32        log.debug('ErrorCheckerContext Depth: %s' % self.depth)
33        if self.depth == 0 or self.always_check:
34            errors = self.scpi._WaitAndFetchErrors(
35                raise_on_error=False)  # Never raise when clearing old errors
36        self.depth += 1
37        return self
38
39    def __exit__(self, type, value, traceback):
40        self.depth -= 1
41        if self.depth <= 0 or self.always_check:
42            self.scpi._WaitAndFetchErrors()
43        return
44
45
46class Scpi(object):
47    """Wrapper for SCPI.
48
49    SCPI = "standard commands for programmable instruments",
50    a relative of GPIB.
51
52    The SCPI driver must export:  Query, Send, Reset and Close
53    """
54
55    def __init__(self, driver, opc_on_stanza=False):
56        self.driver = driver
57        self.opc_on_stanza = opc_on_stanza
58        self.checker_context = _ErrorCheckerContext(self)
59
60    def Query(self, command):
61        """Send the SCPI command and return the response."""
62        response = self.driver.Query(command)
63        return response
64
65    def Send(self, command):
66        """Send the SCPI command."""
67        self.driver.Send(command)
68
69    def Reset(self):
70        """Tell the device to reset with *RST."""
71        # Some devices (like the prologix) require special handling for
72        # reset.
73        self.driver.Reset()
74
75    def Close(self):
76        """Close the device."""
77        self.driver.Close()
78
79    def RetrieveErrors(self):
80        """Retrieves all SYSTem:ERRor messages from the device."""
81        errors = []
82        while True:
83            error = self.Query('SYSTem:ERRor?')
84            if '+0,"No error"' in error:
85                # We've reached the end of the error stack
86                break
87
88            if '-420' in error and 'Query UNTERMINATED' in error:
89                # This is benign; the GPIB bridge asked for a response when
90                # the device didn't have one to give.
91
92                # TODO(rochberg): This is a layering violation; we should
93                # really only accept -420 if the underlying driver is in a
94                # mode that is known to cause this
95                continue
96
97            if '+292' in error and 'Data arrived on unknown SAPI' in error:
98                # This may be benign; It is known to occur when we do a switch
99                # from GPRS to WCDMA
100                continue
101
102            errors.append(error)
103
104        self.Send('*CLS')           # Clear status
105        errors.reverse()
106        return errors
107
108    def _WaitAndFetchErrors(self, raise_on_error=True):
109        """Waits for command completion, returns errors."""
110        self.Query('*OPC?')      # Wait for operation complete
111        errors = self.RetrieveErrors()
112        if errors and raise_on_error:
113            raise cellular_system_error.BadScpiCommand('\n'.join(errors))
114        return errors
115
116    def SimpleVerify(self, command, arg):
117        """Sends "command arg", then "command?", expecting arg back.
118
119        Arguments:
120          command: SCPI command
121          arg: Argument.  We currently check for exact equality: you should
122            send strings quoted with " because that's what the 8960 returns.
123            We also fail if you send 1 and receive +1 back.
124
125        Raises:
126          Error:  Verification failed
127        """
128        self.always_check = False
129        with self.checker_context:
130            self.Send('%s %s' % (command, arg))
131            result = self.Query('%s?' % (command,))
132            if result != arg:
133                raise cellular_system_error.BadScpiCommand(
134                    'Error on %s: sent %s, got %s' % (command, arg, result))
135
136    def SendStanza(self, commands):
137        """
138        Sends a list of commands and verifies that they complete correctly.
139        """
140        with self.checker_context:
141            for c in commands:
142                if self.opc_on_stanza:
143                    self.Send(c)
144                    self.Query('*OPC?')
145                else:
146                    self.Send(c)
147