#!/usr/bin/python # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import copy import mock import prologix_scpi_driver import scpi import unittest import cellular_logging import cellular_system_error log = cellular_logging.SetupCellularLogging('scpi_test') # TODO:(byronk): # a hack for now. Should look this up in labconfig_data. crbug.com/225108 # TODO:(byronk): # replace SystemError with a specific exception crbug.com/225127 scpi_instruments = [ # Agilent 8960 call box {'name_part': "8960", 'gpib_addr': '14', 'ip': '172.22.50.118'}, # PXT is called 6621 {'name_part': "6621", 'gpib_addr': '14', 'ip': "172.22.50.244"} ] class BasicPrologixTest(unittest.TestCase): """ Basic connection test """ def test_bad_ip_address(self): """ Connect to the wrong port and check for the right error message. """ instr = copy.copy(scpi_instruments[0]) instr['ip'] = '192.168.0.0' # str(int(instr['gpib_addr'])+1) log.debug(instr) with self.assertRaises(Exception) as ex: self._get_idns_and_verify(instruments=[instr], opc=True) self.assertIsInstance(ex.exception, cellular_system_error.SocketTimeout) def test_ConnectToPortSuccess(self): """ Make a socket connection """ s = scpi_instruments[0] prologix_scpi_driver.connect_to_port(s['ip'], 1234, 5) def test_ConnectToPortBadIP(self): """ Make a socket connection """ with self.assertRaises(Exception) as ex: prologix_scpi_driver.connect_to_port('192.168.255.111', 1234, 1) self.assertIsInstance(ex.exception, cellular_system_error.SocketTimeout) def test_BadGpibAddress(self): """ How does the code behave if we can't connect. """ instr = copy.copy(scpi_instruments[0]) instr['gpib_addr'] = 9 # str(int(instr['gpib_addr'])+1) with self.assertRaises(Exception) as ex: self._get_idns_and_verify(instruments=[instr], opc=True) self.assertIsInstance(ex.exception, cellular_system_error.InstrumentTimeout) @mock.patch.object(prologix_scpi_driver.PrologixScpiDriver, '_DirectQuery') def test_NonClearReadBufferBeforeInit(self, patched_driver): """ Sometimes the Prologix box will have junk in it's read buffer There is code to read the junk out until setting the ++addr works. Test that here. """ s = scpi_instruments[0] patched_driver.side_effect = ['junk1', 'junk2', s['gpib_addr']] driver = prologix_scpi_driver.PrologixScpiDriver( hostname=s['ip'], port=1234, gpib_address=s['gpib_addr'], read_timeout_seconds=2) def test_Reset(self): for instr in scpi_instruments: scpi_connection = self._open_prologix(instr, opc_on_stanza=True, read_timeout_seconds=20) scpi_connection.Reset() self.scpi_connection.Close() def test_SimpleVerify(self): """ call SimpleVerify. """ # TODO(byronk): make sure this test only runs on the 8960. This # command doesn't work on other boxes for instr in scpi_instruments[:1]: assert instr['name_part'] == '8960' scpi_connection = self._open_prologix(instr, opc_on_stanza=True, read_timeout_seconds=2) # Check to see if the power state is off. # setting most instrument to off should be ok. scpi_connection.SimpleVerify('call:ms:pow:targ', '+0') self.scpi_connection.Close() def test_FetchErrors(self): """ call FetchErrors """ for instr in scpi_instruments: scpi_connection = self._open_prologix(instr, opc_on_stanza=True, read_timeout_seconds=2) scpi_connection._WaitAndFetchErrors() self.scpi_connection.Close() def test_BadScpiCommand(self): """ Send a bad command. We should fail gracefully. """ for instr in scpi_instruments: scpi_connection = self._open_prologix(instr, opc_on_stanza=True, read_timeout_seconds=1) try: scpi_connection.Query('*IDN') except cellular_system_error.InstrumentTimeout: assert \ "Should have raised a Instrument Timeout on a bad SCPI command" def test_ErrorCheckerContextAndStanzaSendingOpcFalse(self): """ Send a stanza, which uses the context manager """ for instr in scpi_instruments: scpi_connection = self._open_prologix(instr, opc_on_stanza=False, read_timeout_seconds=5) scpi_connection.SendStanza(['*WAI']) scpi_connection.Close() def test_ErrorCheckerContextAndStanzaSendingOpcTrue(self): """ Send a stanza, which uses the context manager """ for instr in scpi_instruments: scpi_connection = self._open_prologix(instr, opc_on_stanza=True, read_timeout_seconds=5) scpi_connection.SendStanza(['*WAI']) scpi_connection.Close() def test_GetIdnOpcTrue(self): """ Test with opc True. OPC= operation complete. Asking this question *OPC? after commands blocks until the command finishes. This prevents us from sending commands faster then then the instrument can handle. True is usually the right setting. """ self._get_idns_and_verify(instruments=scpi_instruments, opc=True) def test_GetIdnOpcFalse(self): """ Now with OPC off. """ self._get_idns_and_verify(instruments=scpi_instruments, opc=False) def _open_prologix(self, instr, opc_on_stanza, read_timeout_seconds=2): """ Build the prologix object. """ ip_addr = instr['ip'] name_part = instr['name_part'] gpib_addr = instr['gpib_addr'] log.debug("trying %s at %s" % (name_part, ip_addr)) driver = prologix_scpi_driver.PrologixScpiDriver( hostname=ip_addr, port=1234, gpib_address=gpib_addr, read_timeout_seconds=read_timeout_seconds) self.scpi_connection = scpi.Scpi(driver) log.debug("setting opc to %s " % opc_on_stanza) self.scpi_connection.opc_on_stanza = opc_on_stanza return self.scpi_connection def _get_idns_and_verify(self, instruments, opc=False): """ Get the idn string from all the instruments, and check that it contains the desired substring. This is a quick sanity check only. """ for instr in instruments: scpi_connection = self._open_prologix(instr, opc_on_stanza=opc) response = scpi_connection.Query('*IDN?') log.debug("looking for %s in response string: %s " % (instr['name_part'], response)) assert instr['name_part'] in response self.scpi_connection.Close() if __name__ == '__main__': unittest.main()