• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python
2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6
7import copy
8import mock
9import prologix_scpi_driver
10import scpi
11import unittest
12import cellular_logging
13import cellular_system_error
14
15log = cellular_logging.SetupCellularLogging('scpi_test')
16
17# TODO:(byronk):
18# a hack for now. Should look this up in labconfig_data. crbug.com/225108
19# TODO:(byronk):
20# replace SystemError with a specific exception crbug.com/225127
21
22scpi_instruments = [
23    # Agilent 8960 call box
24    {'name_part': "8960", 'gpib_addr': '14', 'ip': '172.22.50.118'},
25    # PXT is called 6621
26    {'name_part': "6621", 'gpib_addr': '14', 'ip': "172.22.50.244"}
27]
28
29
30class BasicPrologixTest(unittest.TestCase):
31    """
32    Basic connection test
33    """
34
35    def test_bad_ip_address(self):
36        """
37        Connect to the wrong port and check for the right error message.
38        """
39        instr = copy.copy(scpi_instruments[0])
40        instr['ip'] = '192.168.0.0'  # str(int(instr['gpib_addr'])+1)
41        log.debug(instr)
42        with self.assertRaises(Exception) as ex:
43            self._get_idns_and_verify(instruments=[instr], opc=True)
44        self.assertIsInstance(ex.exception,
45                              cellular_system_error.SocketTimeout)
46
47    def test_ConnectToPortSuccess(self):
48        """ Make a socket connection """
49        s = scpi_instruments[0]
50        prologix_scpi_driver.connect_to_port(s['ip'], 1234, 5)
51
52    def test_ConnectToPortBadIP(self):
53        """ Make a socket connection """
54        with self.assertRaises(Exception) as ex:
55            prologix_scpi_driver.connect_to_port('192.168.255.111', 1234, 1)
56        self.assertIsInstance(ex.exception,
57                              cellular_system_error.SocketTimeout)
58
59    def test_BadGpibAddress(self):
60        """
61        How does the code behave if we can't connect.
62        """
63        instr = copy.copy(scpi_instruments[0])
64        instr['gpib_addr'] = 9  # str(int(instr['gpib_addr'])+1)
65        with self.assertRaises(Exception) as ex:
66            self._get_idns_and_verify(instruments=[instr], opc=True)
67        self.assertIsInstance(ex.exception,
68                              cellular_system_error.InstrumentTimeout)
69
70    @mock.patch.object(prologix_scpi_driver.PrologixScpiDriver, '_DirectQuery')
71    def test_NonClearReadBufferBeforeInit(self, patched_driver):
72        """
73        Sometimes the Prologix box will have junk in it's read buffer
74        There is code to read the junk out until setting the ++addr works.
75        Test that here.
76        """
77        s = scpi_instruments[0]
78        patched_driver.side_effect = ['junk1', 'junk2', s['gpib_addr']]
79        driver = prologix_scpi_driver.PrologixScpiDriver(
80            hostname=s['ip'],
81            port=1234,
82            gpib_address=s['gpib_addr'],
83            read_timeout_seconds=2)
84
85    def test_Reset(self):
86        for instr in scpi_instruments:
87            scpi_connection = self._open_prologix(instr, opc_on_stanza=True,
88                                                  read_timeout_seconds=20)
89            scpi_connection.Reset()
90            self.scpi_connection.Close()
91
92    def test_SimpleVerify(self):
93        """
94        call SimpleVerify.
95        """
96        # TODO(byronk): make sure this test only runs on the 8960. This
97        # command doesn't work on other boxes
98        for instr in scpi_instruments[:1]:
99            assert instr['name_part'] == '8960'
100            scpi_connection = self._open_prologix(instr, opc_on_stanza=True,
101                                                  read_timeout_seconds=2)
102            # Check to see if the power state is off.
103            # setting most instrument to off should be ok.
104            scpi_connection.SimpleVerify('call:ms:pow:targ', '+0')
105            self.scpi_connection.Close()
106
107    def test_FetchErrors(self):
108        """
109        call FetchErrors
110        """
111        for instr in scpi_instruments:
112            scpi_connection = self._open_prologix(instr, opc_on_stanza=True,
113                                                  read_timeout_seconds=2)
114            scpi_connection._WaitAndFetchErrors()
115            self.scpi_connection.Close()
116
117    def test_BadScpiCommand(self):
118        """
119        Send a bad command. We should fail gracefully.
120        """
121        for instr in scpi_instruments:
122            scpi_connection = self._open_prologix(instr, opc_on_stanza=True,
123                                                  read_timeout_seconds=1)
124            try:
125                scpi_connection.Query('*IDN')
126            except cellular_system_error.InstrumentTimeout:
127                assert \
128                 "Should have raised a Instrument Timeout on a bad SCPI command"
129
130    def test_ErrorCheckerContextAndStanzaSendingOpcFalse(self):
131        """
132        Send a stanza, which uses the context manager
133        """
134        for instr in scpi_instruments:
135            scpi_connection = self._open_prologix(instr, opc_on_stanza=False,
136                                                  read_timeout_seconds=5)
137            scpi_connection.SendStanza(['*WAI'])
138            scpi_connection.Close()
139
140    def test_ErrorCheckerContextAndStanzaSendingOpcTrue(self):
141        """
142        Send a stanza, which uses the context manager
143        """
144        for instr in scpi_instruments:
145            scpi_connection = self._open_prologix(instr, opc_on_stanza=True,
146                                                  read_timeout_seconds=5)
147            scpi_connection.SendStanza(['*WAI'])
148            scpi_connection.Close()
149
150    def test_GetIdnOpcTrue(self):
151        """
152        Test with opc True. OPC= operation complete. Asking this
153        question *OPC? after commands blocks until the command finishes.
154        This prevents us from sending commands faster then then the
155        instrument can handle. True is usually the right setting.
156
157        """
158        self._get_idns_and_verify(instruments=scpi_instruments, opc=True)
159
160    def test_GetIdnOpcFalse(self):
161        """
162        Now with OPC off.
163        """
164        self._get_idns_and_verify(instruments=scpi_instruments, opc=False)
165
166    def _open_prologix(self, instr, opc_on_stanza, read_timeout_seconds=2):
167        """
168        Build the prologix object.
169        """
170        ip_addr = instr['ip']
171        name_part = instr['name_part']
172        gpib_addr = instr['gpib_addr']
173        log.debug("trying %s at %s" % (name_part, ip_addr))
174        driver = prologix_scpi_driver.PrologixScpiDriver(
175            hostname=ip_addr,
176            port=1234,
177            gpib_address=gpib_addr,
178            read_timeout_seconds=read_timeout_seconds)
179        self.scpi_connection = scpi.Scpi(driver)
180        log.debug("setting opc to %s " % opc_on_stanza)
181        self.scpi_connection.opc_on_stanza = opc_on_stanza
182        return self.scpi_connection
183
184    def _get_idns_and_verify(self, instruments, opc=False):
185        """
186        Get the idn string from all the instruments, and check that it
187        contains the desired substring. This is a quick sanity check only.
188        """
189        for instr in instruments:
190            scpi_connection = self._open_prologix(instr, opc_on_stanza=opc)
191            response = scpi_connection.Query('*IDN?')
192            log.debug("looking for %s  in response string: %s " %
193                      (instr['name_part'], response))
194            assert instr['name_part'] in response
195            self.scpi_connection.Close()
196
197if __name__ == '__main__':
198    unittest.main()
199