1# Lint as: python2, python3 2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6 7import copy 8import unittest 9from unittest import mock 10 11from autotest_lib.client.cros.cellular import cellular_logging 12from autotest_lib.client.cros.cellular import cellular_system_error 13from autotest_lib.client.cros.cellular import prologix_scpi_driver 14from autotest_lib.client.cros.cellular import scpi 15 16log = cellular_logging.SetupCellularLogging('scpi_test') 17 18# TODO:(byronk): 19# a hack for now. Should look this up in labconfig_data. crbug.com/225108 20# TODO:(byronk): 21# replace SystemError with a specific exception crbug.com/225127 22 23scpi_instruments = [ 24 # Agilent 8960 call box 25 {'name_part': "8960", 'gpib_addr': '14', 'ip': '172.22.50.118'}, 26 # PXT is called 6621 27 {'name_part': "6621", 'gpib_addr': '14', 'ip': "172.22.50.244"} 28] 29 30 31class BasicPrologixTest(unittest.TestCase): 32 """ 33 Basic connection test 34 """ 35 36 def test_bad_ip_address(self): 37 """ 38 Connect to the wrong port and check for the right error message. 39 """ 40 instr = copy.copy(scpi_instruments[0]) 41 instr['ip'] = '192.168.0.0' # str(int(instr['gpib_addr'])+1) 42 log.debug(instr) 43 with self.assertRaises(Exception) as ex: 44 self._get_idns_and_verify(instruments=[instr], opc=True) 45 self.assertIsInstance(ex.exception, 46 cellular_system_error.SocketTimeout) 47 48 def test_ConnectToPortSuccess(self): 49 """ Make a socket connection """ 50 s = scpi_instruments[0] 51 prologix_scpi_driver.connect_to_port(s['ip'], 1234, 5) 52 53 def test_ConnectToPortBadIP(self): 54 """ Make a socket connection """ 55 with self.assertRaises(Exception) as ex: 56 prologix_scpi_driver.connect_to_port('192.168.255.111', 1234, 1) 57 self.assertIsInstance(ex.exception, 58 cellular_system_error.SocketTimeout) 59 60 def test_BadGpibAddress(self): 61 """ 62 How does the code behave if we can't connect. 63 """ 64 instr = copy.copy(scpi_instruments[0]) 65 instr['gpib_addr'] = 9 # str(int(instr['gpib_addr'])+1) 66 with self.assertRaises(Exception) as ex: 67 self._get_idns_and_verify(instruments=[instr], opc=True) 68 self.assertIsInstance(ex.exception, 69 cellular_system_error.InstrumentTimeout) 70 71 @mock.patch.object(prologix_scpi_driver.PrologixScpiDriver, '_DirectQuery') 72 def test_NonClearReadBufferBeforeInit(self, patched_driver): 73 """ 74 Sometimes the Prologix box will have junk in it's read buffer 75 There is code to read the junk out until setting the ++addr works. 76 Test that here. 77 """ 78 s = scpi_instruments[0] 79 patched_driver.side_effect = ['junk1', 'junk2', s['gpib_addr']] 80 driver = prologix_scpi_driver.PrologixScpiDriver( 81 hostname=s['ip'], 82 port=1234, 83 gpib_address=s['gpib_addr'], 84 read_timeout_seconds=2) 85 86 def test_Reset(self): 87 for instr in scpi_instruments: 88 scpi_connection = self._open_prologix(instr, opc_on_stanza=True, 89 read_timeout_seconds=20) 90 scpi_connection.Reset() 91 self.scpi_connection.Close() 92 93 def test_SimpleVerify(self): 94 """ 95 call SimpleVerify. 96 """ 97 # TODO(byronk): make sure this test only runs on the 8960. This 98 # command doesn't work on other boxes 99 for instr in scpi_instruments[:1]: 100 assert instr['name_part'] == '8960' 101 scpi_connection = self._open_prologix(instr, opc_on_stanza=True, 102 read_timeout_seconds=2) 103 # Check to see if the power state is off. 104 # setting most instrument to off should be ok. 105 scpi_connection.SimpleVerify('call:ms:pow:targ', '+0') 106 self.scpi_connection.Close() 107 108 def test_FetchErrors(self): 109 """ 110 call FetchErrors 111 """ 112 for instr in scpi_instruments: 113 scpi_connection = self._open_prologix(instr, opc_on_stanza=True, 114 read_timeout_seconds=2) 115 scpi_connection._WaitAndFetchErrors() 116 self.scpi_connection.Close() 117 118 def test_BadScpiCommand(self): 119 """ 120 Send a bad command. We should fail gracefully. 121 """ 122 for instr in scpi_instruments: 123 scpi_connection = self._open_prologix(instr, opc_on_stanza=True, 124 read_timeout_seconds=1) 125 try: 126 scpi_connection.Query('*IDN') 127 except cellular_system_error.InstrumentTimeout: 128 assert \ 129 "Should have raised a Instrument Timeout on a bad SCPI command" 130 131 def test_ErrorCheckerContextAndStanzaSendingOpcFalse(self): 132 """ 133 Send a stanza, which uses the context manager 134 """ 135 for instr in scpi_instruments: 136 scpi_connection = self._open_prologix(instr, opc_on_stanza=False, 137 read_timeout_seconds=5) 138 scpi_connection.SendStanza(['*WAI']) 139 scpi_connection.Close() 140 141 def test_ErrorCheckerContextAndStanzaSendingOpcTrue(self): 142 """ 143 Send a stanza, which uses the context manager 144 """ 145 for instr in scpi_instruments: 146 scpi_connection = self._open_prologix(instr, opc_on_stanza=True, 147 read_timeout_seconds=5) 148 scpi_connection.SendStanza(['*WAI']) 149 scpi_connection.Close() 150 151 def test_GetIdnOpcTrue(self): 152 """ 153 Test with opc True. OPC= operation complete. Asking this 154 question *OPC? after commands blocks until the command finishes. 155 This prevents us from sending commands faster then then the 156 instrument can handle. True is usually the right setting. 157 158 """ 159 self._get_idns_and_verify(instruments=scpi_instruments, opc=True) 160 161 def test_GetIdnOpcFalse(self): 162 """ 163 Now with OPC off. 164 """ 165 self._get_idns_and_verify(instruments=scpi_instruments, opc=False) 166 167 def _open_prologix(self, instr, opc_on_stanza, read_timeout_seconds=2): 168 """ 169 Build the prologix object. 170 """ 171 ip_addr = instr['ip'] 172 name_part = instr['name_part'] 173 gpib_addr = instr['gpib_addr'] 174 log.debug("trying %s at %s" % (name_part, ip_addr)) 175 driver = prologix_scpi_driver.PrologixScpiDriver( 176 hostname=ip_addr, 177 port=1234, 178 gpib_address=gpib_addr, 179 read_timeout_seconds=read_timeout_seconds) 180 self.scpi_connection = scpi.Scpi(driver) 181 log.debug("setting opc to %s " % opc_on_stanza) 182 self.scpi_connection.opc_on_stanza = opc_on_stanza 183 return self.scpi_connection 184 185 def _get_idns_and_verify(self, instruments, opc=False): 186 """ 187 Get the idn string from all the instruments, and check that it 188 contains the desired substring. This is a quick confidence check only. 189 """ 190 for instr in instruments: 191 scpi_connection = self._open_prologix(instr, opc_on_stanza=opc) 192 response = scpi_connection.Query('*IDN?') 193 log.debug("looking for %s in response string: %s " % 194 (instr['name_part'], response)) 195 assert instr['name_part'] in response 196 self.scpi_connection.Close() 197 198if __name__ == '__main__': 199 unittest.main() 200