• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6
7import copy
8import unittest
9from unittest import mock
10
11from autotest_lib.client.cros.cellular import cellular_logging
12from autotest_lib.client.cros.cellular import cellular_system_error
13from autotest_lib.client.cros.cellular import prologix_scpi_driver
14from autotest_lib.client.cros.cellular import scpi
15
16log = cellular_logging.SetupCellularLogging('scpi_test')
17
18# TODO:(byronk):
19# a hack for now. Should look this up in labconfig_data. crbug.com/225108
20# TODO:(byronk):
21# replace SystemError with a specific exception crbug.com/225127
22
23scpi_instruments = [
24    # Agilent 8960 call box
25    {'name_part': "8960", 'gpib_addr': '14', 'ip': '172.22.50.118'},
26    # PXT is called 6621
27    {'name_part': "6621", 'gpib_addr': '14', 'ip': "172.22.50.244"}
28]
29
30
31class BasicPrologixTest(unittest.TestCase):
32    """
33    Basic connection test
34    """
35
36    def test_bad_ip_address(self):
37        """
38        Connect to the wrong port and check for the right error message.
39        """
40        instr = copy.copy(scpi_instruments[0])
41        instr['ip'] = '192.168.0.0'  # str(int(instr['gpib_addr'])+1)
42        log.debug(instr)
43        with self.assertRaises(Exception) as ex:
44            self._get_idns_and_verify(instruments=[instr], opc=True)
45        self.assertIsInstance(ex.exception,
46                              cellular_system_error.SocketTimeout)
47
48    def test_ConnectToPortSuccess(self):
49        """ Make a socket connection """
50        s = scpi_instruments[0]
51        prologix_scpi_driver.connect_to_port(s['ip'], 1234, 5)
52
53    def test_ConnectToPortBadIP(self):
54        """ Make a socket connection """
55        with self.assertRaises(Exception) as ex:
56            prologix_scpi_driver.connect_to_port('192.168.255.111', 1234, 1)
57        self.assertIsInstance(ex.exception,
58                              cellular_system_error.SocketTimeout)
59
60    def test_BadGpibAddress(self):
61        """
62        How does the code behave if we can't connect.
63        """
64        instr = copy.copy(scpi_instruments[0])
65        instr['gpib_addr'] = 9  # str(int(instr['gpib_addr'])+1)
66        with self.assertRaises(Exception) as ex:
67            self._get_idns_and_verify(instruments=[instr], opc=True)
68        self.assertIsInstance(ex.exception,
69                              cellular_system_error.InstrumentTimeout)
70
71    @mock.patch.object(prologix_scpi_driver.PrologixScpiDriver, '_DirectQuery')
72    def test_NonClearReadBufferBeforeInit(self, patched_driver):
73        """
74        Sometimes the Prologix box will have junk in it's read buffer
75        There is code to read the junk out until setting the ++addr works.
76        Test that here.
77        """
78        s = scpi_instruments[0]
79        patched_driver.side_effect = ['junk1', 'junk2', s['gpib_addr']]
80        driver = prologix_scpi_driver.PrologixScpiDriver(
81            hostname=s['ip'],
82            port=1234,
83            gpib_address=s['gpib_addr'],
84            read_timeout_seconds=2)
85
86    def test_Reset(self):
87        for instr in scpi_instruments:
88            scpi_connection = self._open_prologix(instr, opc_on_stanza=True,
89                                                  read_timeout_seconds=20)
90            scpi_connection.Reset()
91            self.scpi_connection.Close()
92
93    def test_SimpleVerify(self):
94        """
95        call SimpleVerify.
96        """
97        # TODO(byronk): make sure this test only runs on the 8960. This
98        # command doesn't work on other boxes
99        for instr in scpi_instruments[:1]:
100            assert instr['name_part'] == '8960'
101            scpi_connection = self._open_prologix(instr, opc_on_stanza=True,
102                                                  read_timeout_seconds=2)
103            # Check to see if the power state is off.
104            # setting most instrument to off should be ok.
105            scpi_connection.SimpleVerify('call:ms:pow:targ', '+0')
106            self.scpi_connection.Close()
107
108    def test_FetchErrors(self):
109        """
110        call FetchErrors
111        """
112        for instr in scpi_instruments:
113            scpi_connection = self._open_prologix(instr, opc_on_stanza=True,
114                                                  read_timeout_seconds=2)
115            scpi_connection._WaitAndFetchErrors()
116            self.scpi_connection.Close()
117
118    def test_BadScpiCommand(self):
119        """
120        Send a bad command. We should fail gracefully.
121        """
122        for instr in scpi_instruments:
123            scpi_connection = self._open_prologix(instr, opc_on_stanza=True,
124                                                  read_timeout_seconds=1)
125            try:
126                scpi_connection.Query('*IDN')
127            except cellular_system_error.InstrumentTimeout:
128                assert \
129                 "Should have raised a Instrument Timeout on a bad SCPI command"
130
131    def test_ErrorCheckerContextAndStanzaSendingOpcFalse(self):
132        """
133        Send a stanza, which uses the context manager
134        """
135        for instr in scpi_instruments:
136            scpi_connection = self._open_prologix(instr, opc_on_stanza=False,
137                                                  read_timeout_seconds=5)
138            scpi_connection.SendStanza(['*WAI'])
139            scpi_connection.Close()
140
141    def test_ErrorCheckerContextAndStanzaSendingOpcTrue(self):
142        """
143        Send a stanza, which uses the context manager
144        """
145        for instr in scpi_instruments:
146            scpi_connection = self._open_prologix(instr, opc_on_stanza=True,
147                                                  read_timeout_seconds=5)
148            scpi_connection.SendStanza(['*WAI'])
149            scpi_connection.Close()
150
151    def test_GetIdnOpcTrue(self):
152        """
153        Test with opc True. OPC= operation complete. Asking this
154        question *OPC? after commands blocks until the command finishes.
155        This prevents us from sending commands faster then then the
156        instrument can handle. True is usually the right setting.
157
158        """
159        self._get_idns_and_verify(instruments=scpi_instruments, opc=True)
160
161    def test_GetIdnOpcFalse(self):
162        """
163        Now with OPC off.
164        """
165        self._get_idns_and_verify(instruments=scpi_instruments, opc=False)
166
167    def _open_prologix(self, instr, opc_on_stanza, read_timeout_seconds=2):
168        """
169        Build the prologix object.
170        """
171        ip_addr = instr['ip']
172        name_part = instr['name_part']
173        gpib_addr = instr['gpib_addr']
174        log.debug("trying %s at %s" % (name_part, ip_addr))
175        driver = prologix_scpi_driver.PrologixScpiDriver(
176            hostname=ip_addr,
177            port=1234,
178            gpib_address=gpib_addr,
179            read_timeout_seconds=read_timeout_seconds)
180        self.scpi_connection = scpi.Scpi(driver)
181        log.debug("setting opc to %s " % opc_on_stanza)
182        self.scpi_connection.opc_on_stanza = opc_on_stanza
183        return self.scpi_connection
184
185    def _get_idns_and_verify(self, instruments, opc=False):
186        """
187        Get the idn string from all the instruments, and check that it
188        contains the desired substring. This is a quick confidence check only.
189        """
190        for instr in instruments:
191            scpi_connection = self._open_prologix(instr, opc_on_stanza=opc)
192            response = scpi_connection.Query('*IDN?')
193            log.debug("looking for %s  in response string: %s " %
194                      (instr['name_part'], response))
195            assert instr['name_part'] in response
196            self.scpi_connection.Close()
197
198if __name__ == '__main__':
199    unittest.main()
200